Provide programmatic alternative to @ConcurrencyLimit

Closes gh-35460

Signed-off-by: Geonhu Park <parkgeonhu@gmail.com>
This commit is contained in:
Geonhu Park 2025-09-26 14:59:09 +09:00
parent 60673a0e32
commit cc7dc955ac
5 changed files with 228 additions and 0 deletions

View File

@ -0,0 +1,72 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.concurrency;
import org.jspecify.annotations.Nullable;
import org.springframework.util.ConcurrencyThrottleSupport;
/**
* Template-style API that throttles concurrent executions of user-provided callbacks
* according to a configurable concurrency limit.
*
* <p>Blocking semantics are identical to {@link ConcurrencyThrottleSupport}:
* when the configured limit is reached, additional callers will block until a
* permit becomes available.
*
* <p>The default concurrency limit of this template is 1.
*
* @author Geonhu Park
* @since 7.1
* @see ConcurrencyThrottleSupport
*/
@SuppressWarnings("serial")
public class ConcurrencyLimitTemplate extends ConcurrencyThrottleSupport {
/**
* Create a default {@code ConcurrencyLimitTemplate}
* with concurrency limit 1.
*/
public ConcurrencyLimitTemplate() {
this(1);
}
/**
* Create a {@code ConcurrencyThrottleInterceptor}
* with the given concurrency limit.
*/
public ConcurrencyLimitTemplate(int concurrencyLimit) {
setConcurrencyLimit(concurrencyLimit);
}
/**
* Execute the supplied callback under the configured concurrency limit.
* @param concurrencyLimited the unit of work to run
* @param <R> the result type (nullable)
* @return the callback's result (possibly {@code null})
* @throws Throwable any exception thrown by the callback
*/
public <R extends @Nullable Object> @Nullable R execute(ConcurrencyLimited<R> concurrencyLimited) throws Throwable {
beforeAccess();
try {
return concurrencyLimited.execute();
}
finally {
afterAccess();
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.concurrency;
import org.jspecify.annotations.Nullable;
/**
* Functional callback representing a single unit of work to be executed under
* the concurrency throttling of a {@link ConcurrencyLimitTemplate}.
*
* @author Geonhu Park
* @since 7.1
* @param <R> the result type (nullable)
* @see ConcurrencyLimitTemplate
*/
@FunctionalInterface
public interface ConcurrencyLimited<R extends @Nullable Object> {
/**
* Execute the concurrency-limited operation.
* @return the result (may be {@code null})
* @throws Throwable any error from the underlying operation
*/
R execute() throws Throwable;
}

View File

@ -0,0 +1,5 @@
/**
* Concurrency limiting (throttling) support via {@link org.springframework.core.concurrency.ConcurrencyLimitTemplate}
* and {@link org.springframework.core.concurrency.ConcurrencyLimited}.
*/
package org.springframework.core.concurrency;

View File

@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
* @see #beforeAccess()
* @see #afterAccess()
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
* @see org.springframework.core.concurrency.ConcurrencyLimitTemplate
* @see java.io.Serializable
*/
@SuppressWarnings("serial")

View File

@ -0,0 +1,111 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.concurrency;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests for {@link ConcurrencyLimitTemplate}.
*
* @author Geonhu Park
*/
class ConcurrencyLimitTemplateTests {
private static final Log logger = LogFactory.getLog(ConcurrencyLimitTemplateTests.class);
private static final int NR_OF_THREADS = 100;
private static final int NR_OF_ITERATIONS = 1000;
@ParameterizedTest
@ValueSource(ints = {1, 10})
void multipleThreadsWithLimit(int concurrencyLimit) {
ConcurrencyLimitTemplate template = new ConcurrencyLimitTemplate(concurrencyLimit);
Thread[] threads = new Thread[NR_OF_THREADS];
for (int i = 0; i < NR_OF_THREADS; i++) {
threads[i] = new ConcurrencyThread(template, null);
threads[i].start();
}
for (int i = 0; i < NR_OF_THREADS / 10; i++) {
try {
Thread.sleep(5);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
threads[i] = new ConcurrencyThread(template,
(i % 2 == 0 ? new OutOfMemoryError() : new IllegalStateException()));
threads[i].start();
}
for (Thread t : threads) {
try {
t.join();
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
private static class ConcurrencyThread extends Thread {
private final ConcurrencyLimitTemplate template;
private final Throwable ex;
ConcurrencyThread(ConcurrencyLimitTemplate template, Throwable ex) {
this.template = template;
this.ex = ex;
}
@Override
public void run() {
if (this.ex != null) {
try {
this.template.execute(() -> {
throw this.ex;
});
}
catch (RuntimeException | Error err) {
if (err == this.ex) {
logger.info("Expected exception thrown", err);
}
else {
ex.printStackTrace();
}
}
catch (Throwable th) {
th.printStackTrace();
}
}
else {
for (int i = 0; i < NR_OF_ITERATIONS; i++) {
try {
this.template.execute(() -> null);
}
catch (Throwable th) {
th.printStackTrace();
break;
}
}
}
}
}
}