Add option to allow Spring Batch custom isolation levels

See gh-28859
This commit is contained in:
stibi 2021-11-30 08:48:51 +01:00 committed by Stephane Nicoll
parent fc794f17e1
commit a8d1d3104e
6 changed files with 123 additions and 6 deletions

View File

@ -139,7 +139,7 @@ public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean {
* @return the isolation level or {@code null} to use the default * @return the isolation level or {@code null} to use the default
*/ */
protected String determineIsolationLevel() { protected String determineIsolationLevel() {
return null; return this.properties.getJdbc().getIsolationLevelForCreate();
} }
protected PlatformTransactionManager createTransactionManager() { protected PlatformTransactionManager createTransactionManager() {

View File

@ -87,6 +87,11 @@ public class BatchProperties {
*/ */
private DatabaseInitializationMode initializeSchema = DatabaseInitializationMode.EMBEDDED; private DatabaseInitializationMode initializeSchema = DatabaseInitializationMode.EMBEDDED;
/**
* Transaction isolation level to use when creating job meta-data for new jobs.
*/
private String isolationLevelForCreate;
public String getSchema() { public String getSchema() {
return this.schema; return this.schema;
} }
@ -119,6 +124,14 @@ public class BatchProperties {
this.initializeSchema = initializeSchema; this.initializeSchema = initializeSchema;
} }
public String getIsolationLevelForCreate() {
return this.isolationLevelForCreate;
}
public void setIsolationLevelForCreate(String isolationLevelForCreate) {
this.isolationLevelForCreate = isolationLevelForCreate;
}
} }
} }

View File

@ -38,6 +38,8 @@ public class JpaBatchConfigurer extends BasicBatchConfigurer {
private final EntityManagerFactory entityManagerFactory; private final EntityManagerFactory entityManagerFactory;
private final String isolationLevelForCreate;
/** /**
* Create a new {@link BasicBatchConfigurer} instance. * Create a new {@link BasicBatchConfigurer} instance.
* @param properties the batch properties * @param properties the batch properties
@ -50,12 +52,18 @@ public class JpaBatchConfigurer extends BasicBatchConfigurer {
TransactionManagerCustomizers transactionManagerCustomizers, EntityManagerFactory entityManagerFactory) { TransactionManagerCustomizers transactionManagerCustomizers, EntityManagerFactory entityManagerFactory) {
super(properties, dataSource, transactionManagerCustomizers); super(properties, dataSource, transactionManagerCustomizers);
this.entityManagerFactory = entityManagerFactory; this.entityManagerFactory = entityManagerFactory;
this.isolationLevelForCreate = properties.getJdbc().getIsolationLevelForCreate();
} }
@Override @Override
protected String determineIsolationLevel() { protected String determineIsolationLevel() {
logger.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs"); if (this.isolationLevelForCreate == null) {
return "ISOLATION_DEFAULT"; logger.warn(
"JPA does not support custom isolation levels, so locks may not be taken when launching Jobs. Define spring.batch.jdbc.isolation-level-for-create property to force a custom isolation level.");
return "ISOLATION_DEFAULT";
}
return this.isolationLevelForCreate;
} }
@Override @Override

View File

@ -208,6 +208,8 @@ class BatchAutoConfigurationTests {
// level) // level)
assertThat(context.getBean(JobRepository.class).getLastJobExecution("job", new JobParameters())) assertThat(context.getBean(JobRepository.class).getLastJobExecution("job", new JobParameters()))
.isNull(); .isNull();
assertThat(context.getBean(JobRepository.class))
.satisfies(JobRepositoryTestingSupport.isolationLevelRequirements("ISOLATION_DEFAULT"));
}); });
} }
@ -232,6 +234,16 @@ class BatchAutoConfigurationTests {
}); });
} }
@Test
void testCustomIsolationLevelForCreate() {
this.contextRunner
.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class,
HibernateJpaAutoConfiguration.class)
.withPropertyValues("spring.batch.jdbc.isolation-level-for-create:ISOLATION_READ_COMMITTED")
.run((context) -> assertThat(context.getBean(JobRepository.class))
.satisfies(JobRepositoryTestingSupport.isolationLevelRequirements("ISOLATION_READ_COMMITTED")));
}
@Test @Test
void testCustomizeJpaTransactionManagerUsingProperties() { void testCustomizeJpaTransactionManagerUsingProperties() {
this.contextRunner this.contextRunner

View File

@ -27,6 +27,7 @@ import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.TestAutoConfigurationPackage; import org.springframework.boot.autoconfigure.TestAutoConfigurationPackage;
import org.springframework.boot.autoconfigure.batch.BatchProperties.Jdbc;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration; import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.test.City; import org.springframework.boot.autoconfigure.orm.jpa.test.City;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration; import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
@ -37,6 +38,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.from;
/** /**
* Tests for {@link BatchAutoConfiguration} when JPA is not on the classpath. * Tests for {@link BatchAutoConfiguration} when JPA is not on the classpath.
@ -59,18 +61,24 @@ class BatchAutoConfigurationWithoutJpaTests {
assertThat(context).hasSingleBean(PlatformTransactionManager.class); assertThat(context).hasSingleBean(PlatformTransactionManager.class);
assertThat(context.getBean(PlatformTransactionManager.class).toString()) assertThat(context.getBean(PlatformTransactionManager.class).toString())
.contains("DataSourceTransactionManager"); .contains("DataSourceTransactionManager");
assertThat(context.getBean(BatchProperties.class).getJdbc().getInitializeSchema()) assertThat(context.getBean(BatchProperties.class).getJdbc())
.isEqualTo(DatabaseInitializationMode.EMBEDDED); .returns("classpath:org/springframework/batch/core/schema-@@platform@@.sql",
from(Jdbc::getSchema))
.returns(DatabaseInitializationMode.EMBEDDED, from(Jdbc::getInitializeSchema))
.returns(null, from(Jdbc::getIsolationLevelForCreate));
assertThat(new JdbcTemplate(context.getBean(DataSource.class)) assertThat(new JdbcTemplate(context.getBean(DataSource.class))
.queryForList("select * from BATCH_JOB_EXECUTION")).isEmpty(); .queryForList("select * from BATCH_JOB_EXECUTION")).isEmpty();
assertThat(context.getBean(JobExplorer.class).findRunningJobExecutions("test")).isEmpty(); assertThat(context.getBean(JobExplorer.class).findRunningJobExecutions("test")).isEmpty();
assertThat(context.getBean(JobRepository.class).getLastJobExecution("test", new JobParameters())) assertThat(context.getBean(JobRepository.class).getLastJobExecution("test", new JobParameters()))
.isNull(); .isNull();
assertThat(context.getBean(JobRepository.class)).satisfies(
JobRepositoryTestingSupport.isolationLevelRequirements("ISOLATION_SERIALIZABLE"));
}); });
} }
@Test @Test
void jdbcWithCustomPrefix() { void jdbcWithCustomSettings() {
this.contextRunner.withUserConfiguration(DefaultConfiguration.class, EmbeddedDataSourceConfiguration.class) this.contextRunner.withUserConfiguration(DefaultConfiguration.class, EmbeddedDataSourceConfiguration.class)
.withPropertyValues("spring.datasource.generate-unique-name=true", .withPropertyValues("spring.datasource.generate-unique-name=true",
"spring.batch.jdbc.schema:classpath:batch/custom-schema-hsql.sql", "spring.batch.jdbc.schema:classpath:batch/custom-schema-hsql.sql",

View File

@ -0,0 +1,76 @@
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.batch;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
import org.aopalliance.aop.Advice;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.springframework.aop.Advisor;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
final class JobRepositoryTestingSupport {
private JobRepositoryTestingSupport() {
}
static Consumer<JobRepository> isolationLevelRequirements(String isolationLevel) {
return (jobRepository) ->
// jobRepository is proxied twice, the inner proxy has the transaction advice.
// This logic does not assume anything about proxy hierarchy, but it does about
// the advice itself.
assertThat(getTransactionAdvices(jobRepository))
.anySatisfy((advice) -> assertThat(advice).extracting("transactionAttributeSource")
.extracting(Object::toString, as(InstanceOfAssertFactories.STRING))
.contains("create*=PROPAGATION_REQUIRES_NEW," + isolationLevel)
.contains("getLastJobExecution*=PROPAGATION_REQUIRES_NEW," + isolationLevel));
}
private static Stream<Advice> getTransactionAdvices(Object candidate) {
Builder<Advice> builder = Stream.builder();
getTransactionAdvices(candidate, builder);
return builder.build();
}
private static void getTransactionAdvices(Object candidate, Builder<Advice> builder) {
try {
if (AopUtils.isAopProxy(candidate) && candidate instanceof Advised) {
Arrays.stream(((Advised) candidate).getAdvisors()).map(Advisor::getAdvice)
.filter(TransactionAspectSupport.class::isInstance).forEach(builder::add);
Object target = ((Advised) candidate).getTargetSource().getTarget();
if (target != null) {
getTransactionAdvices(target, builder);
}
}
}
catch (Exception ex) {
throw new IllegalStateException("Failed to unwrap proxied object", ex);
}
}
}