diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index e409f530249..cf6c4dd34c5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -38,6 +38,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.transaction.KafkaTransactionManager; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. @@ -45,6 +46,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; * @author Gary Russell * @author Stephane Nicoll * @author Eddú Meléndez + * @author Nakul Mishra * @since 1.5.0 */ @Configuration @@ -94,8 +96,22 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory kafkaProducerFactory() { - return new DefaultKafkaProducerFactory<>( + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>( this.properties.buildProducerProperties()); + String transactionIdPrefix = this.properties.getProducer() + .getTransactionIdPrefix(); + if (transactionIdPrefix != null) { + factory.setTransactionIdPrefix(transactionIdPrefix); + } + return factory; + } + + @Bean + @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") + @ConditionalOnMissingBean + public KafkaTransactionManager kafkaTransactionManager( + ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 08dd348531f..55e960a6314 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils; * @author Gary Russell * @author Stephane Nicoll * @author Artem Bilan + * @author Nakul Mishra * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -519,6 +520,11 @@ public class KafkaProperties { */ private Integer retries; + /** + * When non empty, enables transaction support for producer. + */ + private String transactionIdPrefix; + /** * Additional producer-specific properties used to configure the client. */ @@ -600,6 +606,14 @@ public class KafkaProperties { this.retries = retries; } + public String getTransactionIdPrefix() { + return this.transactionIdPrefix; + } + + public void setTransactionIdPrefix(String transactionIdPrefix) { + this.transactionIdPrefix = transactionIdPrefix; + } + public Map getProperties() { return this.properties; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 64d0f12b64a..1fb5e66a840 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -48,6 +48,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.kafka.transaction.KafkaTransactionManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -59,6 +60,7 @@ import static org.mockito.Mockito.mock; * @author Gary Russell * @author Stephane Nicoll * @author Eddú Meléndez + * @author Nakul Mishra */ public class KafkaAutoConfigurationTests { @@ -198,6 +200,8 @@ public class KafkaAutoConfigurationTests { assertThat( context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .isEmpty(); + assertThat(context.getBeansOfType(KafkaTransactionManager.class)) + .isEmpty(); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); }); @@ -256,6 +260,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", + "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") @@ -297,6 +302,8 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo( AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat(context.getBeansOfType(KafkaTransactionManager.class)) + .hasSize(1); assertThat(((Map) dfa.getPropertyValue("options"))) .containsExactly(entry("useKeyTab", "true")); }); diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index ec433f382e1..ea907afe4fd 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -998,6 +998,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.producer.ssl.keystore-password= # Store password for the key store file. spring.kafka.producer.ssl.truststore-location= # Location of the trust store file. spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file. + spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer. spring.kafka.producer.value-serializer= # Serializer class for values. spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file.