Merge pull request #29089 from garyrussell
* pr/29089: Polish "Add transactionIdPrefix Property to KafkaTemplate" Add transactionIdPrefix Property to KafkaTemplate Closes gh-29089
This commit is contained in:
commit
9ff4ea523d
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2020 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
@ -66,10 +67,12 @@ public class KafkaAutoConfiguration {
|
|||
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
|
||||
ProducerListener<Object, Object> kafkaProducerListener,
|
||||
ObjectProvider<RecordMessageConverter> messageConverter) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
|
||||
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
|
||||
kafkaTemplate.setProducerListener(kafkaProducerListener);
|
||||
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
|
||||
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
|
||||
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
|
||||
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
|
||||
return kafkaTemplate;
|
||||
}
|
||||
|
||||
|
|
|
@ -824,6 +824,12 @@ public class KafkaProperties {
|
|||
*/
|
||||
private String defaultTopic;
|
||||
|
||||
/**
|
||||
* Transaction id prefix, override the transaction id prefix in the producer
|
||||
* factory.
|
||||
*/
|
||||
private String transactionIdPrefix;
|
||||
|
||||
public String getDefaultTopic() {
|
||||
return this.defaultTopic;
|
||||
}
|
||||
|
@ -832,6 +838,14 @@ public class KafkaProperties {
|
|||
this.defaultTopic = defaultTopic;
|
||||
}
|
||||
|
||||
public String getTransactionIdPrefix() {
|
||||
return this.transactionIdPrefix;
|
||||
}
|
||||
|
||||
public void setTransactionIdPrefix(String transactionIdPrefix) {
|
||||
this.transactionIdPrefix = transactionIdPrefix;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Listener {
|
||||
|
|
|
@ -384,6 +384,7 @@ class KafkaAutoConfigurationTests {
|
|||
void listenerProperties() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
|
||||
"spring.kafka.template.transaction-id-prefix=txOverride",
|
||||
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
|
||||
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
|
||||
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
|
||||
|
@ -406,6 +407,7 @@ class KafkaAutoConfigurationTests {
|
|||
assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(MessagingMessageConverter.class);
|
||||
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
|
||||
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
|
||||
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
|
||||
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
|
||||
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
|
||||
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
|
||||
|
|
Loading…
Reference in New Issue