Merge pull request #6961 from garyrussell:kafkaAutoConfig
* pr/6961: Polish "Apache Kafka support" contribution Add Apache Kafka support
This commit is contained in:
commit
cc07f56c87
|
|
@ -472,6 +472,11 @@
|
|||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-spring-service-connector</artifactId>
|
||||
|
|
@ -605,6 +610,18 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-junit-runners</artifactId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
|
||||
/**
|
||||
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.5.0
|
||||
*/
|
||||
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||
|
||||
private KafkaProperties properties;
|
||||
|
||||
/**
|
||||
* Set the {@link KafkaProperties} to use.
|
||||
* @param properties the properties
|
||||
*/
|
||||
void setKafkaProperties(KafkaProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the specified Kafka listener container factory. The factory can be
|
||||
* further tuned and default settings can be overridden.
|
||||
* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
|
||||
* instance to configure
|
||||
* @param consumerFactory the {@link ConsumerFactory} to use
|
||||
*/
|
||||
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
|
||||
ConsumerFactory<Object, Object> consumerFactory) {
|
||||
listenerContainerFactory.setConsumerFactory(consumerFactory);
|
||||
Listener container = this.properties.getListener();
|
||||
ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties();
|
||||
if (container.getAckMode() != null) {
|
||||
containerProperties.setAckMode(container.getAckMode());
|
||||
}
|
||||
if (container.getAckCount() != null) {
|
||||
containerProperties.setAckCount(container.getAckCount());
|
||||
}
|
||||
if (container.getAckTime() != null) {
|
||||
containerProperties.setAckTime(container.getAckTime());
|
||||
}
|
||||
if (container.getPollTimeout() != null) {
|
||||
containerProperties.setPollTimeout(container.getPollTimeout());
|
||||
}
|
||||
if (container.getConcurrency() != null) {
|
||||
listenerContainerFactory.setConcurrency(container.getConcurrency());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.config.KafkaListenerConfigUtils;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
|
||||
/**
|
||||
* Configuration for Kafka annotation-driven support.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(EnableKafka.class)
|
||||
class KafkaAnnotationDrivenConfiguration {
|
||||
|
||||
private final KafkaProperties properties;
|
||||
|
||||
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
|
||||
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
|
||||
configurer.setKafkaProperties(this.properties);
|
||||
return configurer;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
|
||||
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
|
||||
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
|
||||
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
|
||||
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<Object, Object>();
|
||||
configurer.configure(factory, kafkaConsumerFactory);
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
||||
@EnableKafka
|
||||
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||
protected static class EnableKafkaConfiguration {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
|
||||
/**
|
||||
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(KafkaTemplate.class)
|
||||
@EnableConfigurationProperties(KafkaProperties.class)
|
||||
@Import(KafkaAnnotationDrivenConfiguration.class)
|
||||
public class KafkaAutoConfiguration {
|
||||
|
||||
private final KafkaProperties properties;
|
||||
|
||||
public KafkaAutoConfiguration(KafkaProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(KafkaTemplate.class)
|
||||
public KafkaTemplate<?, ?> kafkaTemplate(
|
||||
ProducerFactory<Object, Object> kafkaProducerFactory,
|
||||
ProducerListener<Object, Object> kafkaProducerListener) {
|
||||
KafkaTemplate<Object, Object> kafkaTemplate =
|
||||
new KafkaTemplate<Object, Object>(kafkaProducerFactory);
|
||||
kafkaTemplate.setProducerListener(kafkaProducerListener);
|
||||
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
|
||||
return kafkaTemplate;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(ProducerListener.class)
|
||||
public ProducerListener<Object, Object> kafkaProducerListener() {
|
||||
return new LoggingProducerListener<Object, Object>();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(ConsumerFactory.class)
|
||||
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
|
||||
return new DefaultKafkaConsumerFactory<Object, Object>(
|
||||
this.properties.buildConsumerProperties());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(ProducerFactory.class)
|
||||
public ProducerFactory<?, ?> kafkaProducerFactory() {
|
||||
return new DefaultKafkaProducerFactory<Object, Object>(
|
||||
this.properties.buildProducerProperties());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,723 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
|
||||
/**
|
||||
* Configuration properties for Spring for Apache Kafka.
|
||||
* <p/>
|
||||
* Users should refer to kafka documentation for complete descriptions of these
|
||||
* properties.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @author Stephane Nicoll
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.kafka")
|
||||
public class KafkaProperties {
|
||||
|
||||
private final Consumer consumer = new Consumer();
|
||||
|
||||
private final Producer producer = new Producer();
|
||||
|
||||
private final Listener listener = new Listener();
|
||||
|
||||
private final Template template = new Template();
|
||||
|
||||
private final Ssl ssl = new Ssl();
|
||||
|
||||
// Apache Kafka Common Properties
|
||||
|
||||
/**
|
||||
* Comma-delimited list of host:port pairs to use for establishing the initial
|
||||
* connection to the Kafka cluster.
|
||||
*/
|
||||
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList(
|
||||
"localhost:9092"));
|
||||
|
||||
/**
|
||||
* Id to pass to the server when making requests; used for server-side logging.
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
public Consumer getConsumer() {
|
||||
return this.consumer;
|
||||
}
|
||||
|
||||
public Producer getProducer() {
|
||||
return this.producer;
|
||||
}
|
||||
|
||||
public Listener getListener() {
|
||||
return this.listener;
|
||||
}
|
||||
|
||||
public Ssl getSsl() {
|
||||
return this.ssl;
|
||||
}
|
||||
|
||||
public Template getTemplate() {
|
||||
return this.template;
|
||||
}
|
||||
|
||||
public List<String> getBootstrapServers() {
|
||||
return this.bootstrapServers;
|
||||
}
|
||||
|
||||
public void setBootstrapServers(List<String> bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return this.clientId;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
private Map<String, Object> buildCommonProperties() {
|
||||
Map<String, Object> properties = new HashMap<String, Object>();
|
||||
if (this.bootstrapServers != null) {
|
||||
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
|
||||
}
|
||||
if (this.clientId != null) {
|
||||
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
|
||||
}
|
||||
if (this.ssl.getKeyPassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
|
||||
}
|
||||
if (this.ssl.getKeystoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
|
||||
resourceToPath(this.ssl.getKeystoreLocation()));
|
||||
}
|
||||
if (this.ssl.getKeystorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
|
||||
this.ssl.getKeystorePassword());
|
||||
}
|
||||
if (this.ssl.getTruststoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
|
||||
resourceToPath(this.ssl.getTruststoreLocation()));
|
||||
}
|
||||
if (this.ssl.getTruststorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
|
||||
this.ssl.getTruststorePassword());
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an initial map of consumer properties from the state of this instance.
|
||||
* <p>This allows you to add additional properties, if necessary, and override the
|
||||
* default kafkaConsumerFactory bean.
|
||||
* @return the consumer properties initialized with the customizations defined on
|
||||
* this instance
|
||||
*/
|
||||
public Map<String, Object> buildConsumerProperties() {
|
||||
Map<String, Object> props = buildCommonProperties();
|
||||
props.putAll(this.consumer.buildProperties());
|
||||
return props;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an initial map of producer properties from the state of this instance.
|
||||
* <p>This allows you to add additional properties, if necessary, and override the
|
||||
* default kafkaProducerFactory bean.
|
||||
* @return the producer properties initialized with the customizations defined on
|
||||
* this instance
|
||||
*/
|
||||
public Map<String, Object> buildProducerProperties() {
|
||||
Map<String, Object> props = buildCommonProperties();
|
||||
props.putAll(this.producer.buildProperties());
|
||||
return props;
|
||||
}
|
||||
|
||||
private static String resourceToPath(Resource resource) {
|
||||
try {
|
||||
return resource.getFile().getAbsolutePath();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"Resource '%s' must be on a file system", resource), ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Consumer {
|
||||
|
||||
private final Ssl ssl = new Ssl();
|
||||
|
||||
/**
|
||||
* Frequency in milliseconds that the consumer offsets are auto-committed to
|
||||
* Kafka if 'enable.auto.commit' true.
|
||||
*/
|
||||
private Long autoCommitInterval;
|
||||
|
||||
/**
|
||||
* What to do when there is no initial offset in Kafka or if the current offset
|
||||
* does not exist any more on the server.
|
||||
*/
|
||||
private String autoOffsetReset;
|
||||
|
||||
/**
|
||||
* Comma-delimited list of host:port pairs to use for establishing the initial
|
||||
* connection to the Kafka cluster.
|
||||
*/
|
||||
private List<String> bootstrapServers;
|
||||
|
||||
/**
|
||||
* Id to pass to the server when making requests; used for server-side logging.
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* If true the consumer's offset will be periodically committed in the background.
|
||||
*/
|
||||
private Boolean enableAutoCommit;
|
||||
|
||||
/**
|
||||
* Maximum amount of time in milliseconds the server will block before answering
|
||||
* the fetch request if there isn't sufficient data to immediately satisfy the
|
||||
* requirement given by "fetch.min.bytes".
|
||||
*/
|
||||
private Integer fetchMaxWait;
|
||||
|
||||
/**
|
||||
* Minimum amount of data the server should return for a fetch request in bytes.
|
||||
*/
|
||||
private Integer fetchMinSize;
|
||||
|
||||
/**
|
||||
* Unique string that identifies the consumer group this consumer belongs to.
|
||||
*/
|
||||
private String groupId;
|
||||
|
||||
/**
|
||||
* Expected time in milliseconds between heartbeats to the consumer coordinator.
|
||||
*/
|
||||
private Integer heartbeatInterval;
|
||||
|
||||
/**
|
||||
* Deserializer class for keys.
|
||||
*/
|
||||
private Class<?> keyDeserializer = StringDeserializer.class;
|
||||
|
||||
/**
|
||||
* Deserializer class for values.
|
||||
*/
|
||||
private Class<?> valueDeserializer = StringDeserializer.class;
|
||||
|
||||
public Ssl getSsl() {
|
||||
return this.ssl;
|
||||
}
|
||||
|
||||
public Long getAutoCommitInterval() {
|
||||
return this.autoCommitInterval;
|
||||
}
|
||||
|
||||
public void setAutoCommitInterval(Long autoCommitInterval) {
|
||||
this.autoCommitInterval = autoCommitInterval;
|
||||
}
|
||||
|
||||
public String getAutoOffsetReset() {
|
||||
return this.autoOffsetReset;
|
||||
}
|
||||
|
||||
public void setAutoOffsetReset(String autoOffsetReset) {
|
||||
this.autoOffsetReset = autoOffsetReset;
|
||||
}
|
||||
|
||||
public List<String> getBootstrapServers() {
|
||||
return this.bootstrapServers;
|
||||
}
|
||||
|
||||
public void setBootstrapServers(List<String> bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return this.clientId;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public Boolean getEnableAutoCommit() {
|
||||
return this.enableAutoCommit;
|
||||
}
|
||||
|
||||
public void setEnableAutoCommit(Boolean enableAutoCommit) {
|
||||
this.enableAutoCommit = enableAutoCommit;
|
||||
}
|
||||
|
||||
public Integer getFetchMaxWait() {
|
||||
return this.fetchMaxWait;
|
||||
}
|
||||
|
||||
public void setFetchMaxWait(Integer fetchMaxWait) {
|
||||
this.fetchMaxWait = fetchMaxWait;
|
||||
}
|
||||
|
||||
public Integer getFetchMinSize() {
|
||||
return this.fetchMinSize;
|
||||
}
|
||||
|
||||
public void setFetchMinSize(Integer fetchMinSize) {
|
||||
this.fetchMinSize = fetchMinSize;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
return this.groupId;
|
||||
}
|
||||
|
||||
public void setGroupId(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public Integer getHeartbeatInterval() {
|
||||
return this.heartbeatInterval;
|
||||
}
|
||||
|
||||
public void setHeartbeatInterval(Integer heartbeatInterval) {
|
||||
this.heartbeatInterval = heartbeatInterval;
|
||||
}
|
||||
|
||||
public Class<?> getKeyDeserializer() {
|
||||
return this.keyDeserializer;
|
||||
}
|
||||
|
||||
public void setKeyDeserializer(Class<?> keyDeserializer) {
|
||||
this.keyDeserializer = keyDeserializer;
|
||||
}
|
||||
|
||||
public Class<?> getValueDeserializer() {
|
||||
return this.valueDeserializer;
|
||||
}
|
||||
|
||||
public void setValueDeserializer(Class<?> valueDeserializer) {
|
||||
this.valueDeserializer = valueDeserializer;
|
||||
}
|
||||
|
||||
public Map<String, Object> buildProperties() {
|
||||
Map<String, Object> properties = new HashMap<String, Object>();
|
||||
if (this.autoCommitInterval != null) {
|
||||
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
|
||||
}
|
||||
if (this.autoOffsetReset != null) {
|
||||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
|
||||
}
|
||||
if (this.bootstrapServers != null) {
|
||||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
|
||||
}
|
||||
if (this.clientId != null) {
|
||||
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
|
||||
}
|
||||
if (this.enableAutoCommit != null) {
|
||||
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
|
||||
}
|
||||
if (this.fetchMaxWait != null) {
|
||||
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait);
|
||||
}
|
||||
if (this.fetchMinSize != null) {
|
||||
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
|
||||
}
|
||||
if (this.groupId != null) {
|
||||
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
|
||||
}
|
||||
if (this.heartbeatInterval != null) {
|
||||
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval);
|
||||
}
|
||||
if (this.keyDeserializer != null) {
|
||||
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
|
||||
}
|
||||
if (this.ssl.getKeyPassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
|
||||
}
|
||||
if (this.ssl.getKeystoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
|
||||
}
|
||||
if (this.ssl.getKeystorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
|
||||
}
|
||||
if (this.ssl.getTruststoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
|
||||
resourceToPath(this.ssl.getTruststoreLocation()));
|
||||
}
|
||||
if (this.ssl.getTruststorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
|
||||
}
|
||||
if (this.valueDeserializer != null) {
|
||||
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Producer {
|
||||
|
||||
private final Ssl ssl = new Ssl();
|
||||
|
||||
/**
|
||||
* Number of acknowledgments the producer requires the leader to have received
|
||||
* before considering a request complete.
|
||||
*/
|
||||
private String acks;
|
||||
|
||||
/**
|
||||
* Number of records to batch before sending.
|
||||
*/
|
||||
private Integer batchSize;
|
||||
|
||||
/**
|
||||
* Comma-delimited list of host:port pairs to use for establishing the initial
|
||||
* connection to the Kafka cluster.
|
||||
*/
|
||||
private List<String> bootstrapServers;
|
||||
|
||||
/**
|
||||
* Total bytes of memory the producer can use to buffer records waiting to be
|
||||
* sent to the server.
|
||||
*/
|
||||
private Long bufferMemory;
|
||||
|
||||
/**
|
||||
* Id to pass to the server when making requests; used for server-side logging.
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* Compression type for all data generated by the producer.
|
||||
*/
|
||||
private String compressionType;
|
||||
|
||||
/**
|
||||
* Serializer class for keys.
|
||||
*/
|
||||
private Class<?> keySerializer = StringSerializer.class;
|
||||
|
||||
/**
|
||||
* Serializer class for values.
|
||||
*/
|
||||
private Class<?> valueSerializer = StringSerializer.class;
|
||||
|
||||
/**
|
||||
* When greater than zero, enables retrying of failed sends.
|
||||
*/
|
||||
private Integer retries;
|
||||
|
||||
public Ssl getSsl() {
|
||||
return this.ssl;
|
||||
}
|
||||
|
||||
public String getAcks() {
|
||||
return this.acks;
|
||||
}
|
||||
|
||||
public void setAcks(String acks) {
|
||||
this.acks = acks;
|
||||
}
|
||||
|
||||
public Integer getBatchSize() {
|
||||
return this.batchSize;
|
||||
}
|
||||
|
||||
public void setBatchSize(Integer batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public List<String> getBootstrapServers() {
|
||||
return this.bootstrapServers;
|
||||
}
|
||||
|
||||
public void setBootstrapServers(List<String> bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
}
|
||||
|
||||
public Long getBufferMemory() {
|
||||
return this.bufferMemory;
|
||||
}
|
||||
|
||||
public void setBufferMemory(Long bufferMemory) {
|
||||
this.bufferMemory = bufferMemory;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return this.clientId;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public String getCompressionType() {
|
||||
return this.compressionType;
|
||||
}
|
||||
|
||||
public void setCompressionType(String compressionType) {
|
||||
this.compressionType = compressionType;
|
||||
}
|
||||
|
||||
public Class<?> getKeySerializer() {
|
||||
return this.keySerializer;
|
||||
}
|
||||
|
||||
public void setKeySerializer(Class<?> keySerializer) {
|
||||
this.keySerializer = keySerializer;
|
||||
}
|
||||
|
||||
public Class<?> getValueSerializer() {
|
||||
return this.valueSerializer;
|
||||
}
|
||||
|
||||
public void setValueSerializer(Class<?> valueSerializer) {
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
public Integer getRetries() {
|
||||
return this.retries;
|
||||
}
|
||||
|
||||
public void setRetries(Integer retries) {
|
||||
this.retries = retries;
|
||||
}
|
||||
|
||||
public Map<String, Object> buildProperties() {
|
||||
Map<String, Object> properties = new HashMap<String, Object>();
|
||||
if (this.acks != null) {
|
||||
properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
|
||||
}
|
||||
if (this.batchSize != null) {
|
||||
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
|
||||
}
|
||||
if (this.bootstrapServers != null) {
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
|
||||
}
|
||||
if (this.bufferMemory != null) {
|
||||
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
|
||||
}
|
||||
if (this.clientId != null) {
|
||||
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
|
||||
}
|
||||
if (this.compressionType != null) {
|
||||
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType);
|
||||
}
|
||||
if (this.keySerializer != null) {
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer);
|
||||
}
|
||||
if (this.retries != null) {
|
||||
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
|
||||
}
|
||||
if (this.ssl.getKeyPassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
|
||||
}
|
||||
if (this.ssl.getKeystoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
|
||||
}
|
||||
if (this.ssl.getKeystorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
|
||||
}
|
||||
if (this.ssl.getTruststoreLocation() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
|
||||
resourceToPath(this.ssl.getTruststoreLocation()));
|
||||
}
|
||||
if (this.ssl.getTruststorePassword() != null) {
|
||||
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
|
||||
}
|
||||
if (this.valueSerializer != null) {
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer);
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Template {
|
||||
|
||||
/**
|
||||
* Default topic to which messages will be sent.
|
||||
*/
|
||||
private String defaultTopic;
|
||||
|
||||
public String getDefaultTopic() {
|
||||
return this.defaultTopic;
|
||||
}
|
||||
|
||||
public void setDefaultTopic(String defaultTopic) {
|
||||
this.defaultTopic = defaultTopic;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Listener {
|
||||
|
||||
/**
|
||||
* Listener AckMode; see the spring-kafka documentation.
|
||||
*/
|
||||
private AckMode ackMode;
|
||||
|
||||
/**
|
||||
* Number of threads to run in the listener containers.
|
||||
*/
|
||||
private Integer concurrency;
|
||||
|
||||
/**
|
||||
* Timeout in milliseconds to use when polling the consumer.
|
||||
*/
|
||||
private Long pollTimeout;
|
||||
|
||||
/**
|
||||
* Number of records between offset commits when ackMode is "COUNT" or
|
||||
* "COUNT_TIME".
|
||||
*/
|
||||
private Integer ackCount;
|
||||
|
||||
/**
|
||||
* Time in milliseconds between offset commits when ackMode is "TIME" or
|
||||
* "COUNT_TIME".
|
||||
*/
|
||||
private Long ackTime;
|
||||
|
||||
public AckMode getAckMode() {
|
||||
return this.ackMode;
|
||||
}
|
||||
|
||||
public void setAckMode(AckMode ackMode) {
|
||||
this.ackMode = ackMode;
|
||||
}
|
||||
|
||||
public Integer getConcurrency() {
|
||||
return this.concurrency;
|
||||
}
|
||||
|
||||
public void setConcurrency(Integer concurrency) {
|
||||
this.concurrency = concurrency;
|
||||
}
|
||||
|
||||
public Long getPollTimeout() {
|
||||
return this.pollTimeout;
|
||||
}
|
||||
|
||||
public void setPollTimeout(Long pollTimeout) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
}
|
||||
|
||||
public Integer getAckCount() {
|
||||
return this.ackCount;
|
||||
}
|
||||
|
||||
public void setAckCount(Integer ackCount) {
|
||||
this.ackCount = ackCount;
|
||||
}
|
||||
|
||||
public Long getAckTime() {
|
||||
return this.ackTime;
|
||||
}
|
||||
|
||||
public void setAckTime(Long ackTime) {
|
||||
this.ackTime = ackTime;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Ssl {
|
||||
|
||||
/**
|
||||
* Password of the private key in the key store file.
|
||||
*/
|
||||
private String keyPassword;
|
||||
|
||||
/**
|
||||
* Location of the key store file.
|
||||
*/
|
||||
private Resource keystoreLocation;
|
||||
|
||||
/**
|
||||
* Store password for the key store file.
|
||||
*/
|
||||
private String keystorePassword;
|
||||
|
||||
/**
|
||||
* Location of the trust store file.
|
||||
*/
|
||||
private Resource truststoreLocation;
|
||||
|
||||
/**
|
||||
* Store password for the trust store file.
|
||||
*/
|
||||
private String truststorePassword;
|
||||
|
||||
public String getKeyPassword() {
|
||||
return this.keyPassword;
|
||||
}
|
||||
|
||||
public void setKeyPassword(String keyPassword) {
|
||||
this.keyPassword = keyPassword;
|
||||
}
|
||||
|
||||
public Resource getKeystoreLocation() {
|
||||
return this.keystoreLocation;
|
||||
}
|
||||
|
||||
public void setKeystoreLocation(Resource keystoreLocation) {
|
||||
this.keystoreLocation = keystoreLocation;
|
||||
}
|
||||
|
||||
public String getKeystorePassword() {
|
||||
return this.keystorePassword;
|
||||
}
|
||||
|
||||
public void setKeystorePassword(String keystorePassword) {
|
||||
this.keystorePassword = keystorePassword;
|
||||
}
|
||||
|
||||
public Resource getTruststoreLocation() {
|
||||
return this.truststoreLocation;
|
||||
}
|
||||
|
||||
public void setTruststoreLocation(Resource truststoreLocation) {
|
||||
this.truststoreLocation = truststoreLocation;
|
||||
}
|
||||
|
||||
public String getTruststorePassword() {
|
||||
return this.truststorePassword;
|
||||
}
|
||||
|
||||
public void setTruststorePassword(String truststorePassword) {
|
||||
this.truststorePassword = truststorePassword;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Auto-configuration for Apache Kafka.
|
||||
*/
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
|
@ -409,6 +409,76 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.kafka.consumer.auto-offset-reset",
|
||||
"values": [
|
||||
{
|
||||
"value": "earliest",
|
||||
"description": "Automatically reset the offset to the earliest offset."
|
||||
},
|
||||
{
|
||||
"value": "latest",
|
||||
"description": "Automatically reset the offset to the latest offset."
|
||||
},
|
||||
{
|
||||
"value": "none",
|
||||
"description": "Throw exception to the consumer if no previous offset is found for the consumer's group."
|
||||
},
|
||||
{
|
||||
"value": "exception",
|
||||
"description": "Throw exception to the consumer."
|
||||
}
|
||||
],
|
||||
"providers": [
|
||||
{
|
||||
"name": "any"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.kafka.consumer.key-deserializer",
|
||||
"providers": [
|
||||
{
|
||||
"name": "handle-as",
|
||||
"parameters": {
|
||||
"target": "org.apache.kafka.common.serialization.Deserializer"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.kafka.consumer.value-deserializer",
|
||||
"providers": [
|
||||
{
|
||||
"name": "handle-as",
|
||||
"parameters": {
|
||||
"target": "org.apache.kafka.common.serialization.Deserializer"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.kafka.producer.key-serializer",
|
||||
"providers": [
|
||||
{
|
||||
"name": "handle-as",
|
||||
"parameters": {
|
||||
"target": "org.apache.kafka.common.serialization.Serializer"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.kafka.producer.value-serializer",
|
||||
"providers": [
|
||||
{
|
||||
"name": "handle-as",
|
||||
"parameters": {
|
||||
"target": "org.apache.kafka.common.serialization.Serializer"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "spring.http.converters.preferred-json-mapper",
|
||||
"values": [
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration,\
|
|||
org.springframework.boot.autoconfigure.groovy.template.GroovyTemplateAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.jersey.JerseyAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.jooq.JooqAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration,\
|
||||
org.springframework.boot.autoconfigure.mail.MailSenderValidatorAutoConfiguration,\
|
||||
|
|
|
|||
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.test.util.EnvironmentTestUtils;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link KafkaAutoConfiguration}.
|
||||
*
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaAutoConfigurationIntegrationTests {
|
||||
|
||||
private static final String TEST_TOPIC = "testTopic";
|
||||
|
||||
@ClassRule
|
||||
public static final KafkaEmbedded kafkaEmbedded =
|
||||
new KafkaEmbedded(1, true, TEST_TOPIC);
|
||||
|
||||
private AnnotationConfigApplicationContext context;
|
||||
|
||||
@After
|
||||
public void close() {
|
||||
if (this.context != null) {
|
||||
this.context.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndToEnd() throws Exception {
|
||||
load(KafkaConfig.class,
|
||||
"spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
|
||||
"spring.kafka.consumer.group-id=testGroup",
|
||||
"spring.kafka.consumer.auto-offset-reset=earliest");
|
||||
@SuppressWarnings("unchecked")
|
||||
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
|
||||
template.send(TEST_TOPIC, "foo", "bar");
|
||||
Listener listener = this.context.getBean(Listener.class);
|
||||
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(listener.key).isEqualTo("foo");
|
||||
assertThat(listener.received).isEqualTo("bar");
|
||||
}
|
||||
|
||||
private void load(Class<?> config, String... environment) {
|
||||
this.context = doLoad(new Class<?>[] { config }, environment);
|
||||
}
|
||||
|
||||
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs,
|
||||
String... environment) {
|
||||
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
|
||||
applicationContext.register(configs);
|
||||
applicationContext.register(KafkaAutoConfiguration.class);
|
||||
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
|
||||
applicationContext.refresh();
|
||||
return applicationContext;
|
||||
}
|
||||
|
||||
public static class KafkaConfig {
|
||||
|
||||
@Bean
|
||||
public Listener listener() {
|
||||
return new Listener();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Listener {
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private volatile String received;
|
||||
|
||||
private volatile String key;
|
||||
|
||||
@KafkaListener(topics = TEST_TOPIC)
|
||||
public void listen(String foo,
|
||||
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
|
||||
this.received = foo;
|
||||
this.key = key;
|
||||
this.latch.countDown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
/*
|
||||
* Copyright 2012-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.boot.test.util.EnvironmentTestUtils;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
|
||||
/**
|
||||
* Tests for {@link KafkaAutoConfiguration}.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @author Stephane Nicoll
|
||||
* @since 1.5
|
||||
*/
|
||||
public class KafkaAutoConfigurationTests {
|
||||
|
||||
private AnnotationConfigApplicationContext context;
|
||||
|
||||
@After
|
||||
public void closeContext() {
|
||||
if (this.context != null) {
|
||||
this.context.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumerProperties() {
|
||||
load("spring.kafka.bootstrap-servers=foo:1234",
|
||||
"spring.kafka.ssl.key-password=p1",
|
||||
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
|
||||
"spring.kafka.ssl.keystore-password=p2",
|
||||
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
|
||||
"spring.kafka.ssl.truststore-password=p3",
|
||||
"spring.kafka.consumer.auto-commit-interval=123",
|
||||
"spring.kafka.consumer.auto-offset-reset=earliest",
|
||||
"spring.kafka.consumer.client-id=ccid", // test override common
|
||||
"spring.kafka.consumer.enable-auto-commit=false",
|
||||
"spring.kafka.consumer.fetch-max-wait=456",
|
||||
"spring.kafka.consumer.fetch-min-size=789",
|
||||
"spring.kafka.consumer.group-id=bar",
|
||||
"spring.kafka.consumer.heartbeat-interval=234",
|
||||
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
|
||||
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(consumerFactory)
|
||||
.getPropertyValue("configs");
|
||||
// common
|
||||
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
||||
.isEqualTo(Collections.singletonList("foo:1234"));
|
||||
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
|
||||
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
||||
.endsWith(File.separator + "ksLoc");
|
||||
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
|
||||
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
||||
.endsWith(File.separator + "tsLoc");
|
||||
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
|
||||
// consumer
|
||||
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
|
||||
assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE);
|
||||
assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L);
|
||||
assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
|
||||
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
|
||||
assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
|
||||
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
|
||||
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
|
||||
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
||||
.isEqualTo(IntegerDeserializer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void producerProperties() {
|
||||
load("spring.kafka.clientId=cid",
|
||||
"spring.kafka.producer.acks=all",
|
||||
"spring.kafka.producer.batch-size=20",
|
||||
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
|
||||
"spring.kafka.producer.buffer-memory=12345",
|
||||
"spring.kafka.producer.compression-type=gzip",
|
||||
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
|
||||
"spring.kafka.producer.retries=2",
|
||||
"spring.kafka.producer.ssl.key-password=p4",
|
||||
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
|
||||
"spring.kafka.producer.ssl.keystore-password=p5",
|
||||
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
|
||||
"spring.kafka.producer.ssl.truststore-password=p6",
|
||||
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(producerFactory)
|
||||
.getPropertyValue("configs");
|
||||
// common
|
||||
assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
||||
// producer
|
||||
assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
|
||||
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
|
||||
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
||||
.isEqualTo(Collections.singletonList("bar:1234")); // override
|
||||
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
|
||||
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
|
||||
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
|
||||
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
||||
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
||||
.endsWith(File.separator + "ksLocP");
|
||||
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
|
||||
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
||||
.endsWith(File.separator + "tsLocP");
|
||||
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
|
||||
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
|
||||
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void listenerProperties() {
|
||||
load("spring.kafka.template.default-topic=testTopic",
|
||||
|
||||
"spring.kafka.listener.ack-mode=MANUAL",
|
||||
"spring.kafka.listener.ack-count=123",
|
||||
"spring.kafka.listener.ack-time=456",
|
||||
"spring.kafka.listener.concurrency=3",
|
||||
"spring.kafka.listener.poll-timeout=2000");
|
||||
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
|
||||
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
|
||||
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
|
||||
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context
|
||||
.getBean(KafkaListenerContainerFactory.class);
|
||||
assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory"))
|
||||
.isEqualTo(producerFactory);
|
||||
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
|
||||
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
|
||||
assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
|
||||
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
|
||||
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
|
||||
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
|
||||
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
|
||||
}
|
||||
|
||||
private void load(String... environment) {
|
||||
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
|
||||
ctx.register(KafkaAutoConfiguration.class);
|
||||
EnvironmentTestUtils.addEnvironment(ctx, environment);
|
||||
ctx.refresh();
|
||||
this.context = ctx;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
Test file for Kafka.
|
||||
|
|
@ -0,0 +1 @@
|
|||
Test file for Kafka.
|
||||
|
|
@ -0,0 +1 @@
|
|||
Test file for Kafka.
|
||||
|
|
@ -0,0 +1 @@
|
|||
Test file for Kafka.
|
||||
|
|
@ -156,6 +156,7 @@
|
|||
<spring-hateoas.version>0.21.0.RELEASE</spring-hateoas.version>
|
||||
<spring-integration.version>4.3.5.RELEASE</spring-integration.version>
|
||||
<spring-integration-java-dsl.version>1.2.0.RELEASE</spring-integration-java-dsl.version>
|
||||
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
|
||||
<spring-loaded.version>1.2.6.RELEASE</spring-loaded.version>
|
||||
<spring-mobile.version>1.1.5.RELEASE</spring-mobile.version>
|
||||
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
|
||||
|
|
@ -2123,6 +2124,11 @@
|
|||
<artifactId>spring-hateoas</artifactId>
|
||||
<version>${spring-hateoas.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-bom</artifactId>
|
||||
|
|
|
|||
|
|
@ -856,6 +856,41 @@ content into your application; rather pick only the properties that you need.
|
|||
spring.jms.template.receive-timeout= # Timeout to use for receive calls in milliseconds.
|
||||
spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
|
||||
|
||||
# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
|
||||
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
|
||||
spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
|
||||
spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.
|
||||
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.
|
||||
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
|
||||
spring.kafka.consumer.client-id= # Id to pass to the server when making requests; used for server-side logging.
|
||||
spring.kafka.consumer.enable-auto-commit= # If true the consumer's offset will be periodically committed in the background.
|
||||
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
|
||||
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request in bytes.
|
||||
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
|
||||
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
|
||||
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
|
||||
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
|
||||
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
|
||||
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
|
||||
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
|
||||
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
|
||||
spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer.
|
||||
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
|
||||
spring.kafka.producer.batch-size= # Number of records to batch before sending.
|
||||
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
|
||||
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
|
||||
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
|
||||
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
|
||||
spring.kafka.producer.key-serializer= # Serializer class for keys.
|
||||
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
|
||||
spring.kafka.producer.value-serializer= # Serializer class for values.
|
||||
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
|
||||
spring.kafka.ssl.keystore-location= # Location of the key store file.
|
||||
spring.kafka.ssl.keystore-password= # Store password for the key store file.
|
||||
spring.kafka.ssl.truststore-location= # Location of the trust store file.
|
||||
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
|
||||
spring.kafka.template.default-topic= # Default topic to which messages will be sent.
|
||||
|
||||
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
|
||||
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
|
||||
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.
|
||||
|
|
|
|||
|
|
@ -4082,7 +4082,7 @@ receive messages asynchronously. Spring AMQP provides a similar feature set for
|
|||
'`Advanced Message Queuing Protocol`' and Spring Boot also provides auto-configuration
|
||||
options for `RabbitTemplate` and RabbitMQ. There is also support for STOMP messaging
|
||||
natively in Spring WebSocket and Spring Boot has support for that through starters and a
|
||||
small amount of auto-configuration.
|
||||
small amount of auto-configuration. Spring Boot also has support for Apache Kafka.
|
||||
|
||||
|
||||
|
||||
|
|
@ -4454,6 +4454,108 @@ reached.
|
|||
|
||||
|
||||
|
||||
[[boot-features-kafka]]
|
||||
|
||||
=== Apache Kafka Support
|
||||
|
||||
http://kafka.apache.org/[Apache Kafa] is supported by providing auto-configuration of the
|
||||
`spring-kafka` project.
|
||||
|
||||
Kafka configuration is controlled by external configuration properties in
|
||||
`spring.kafka.*`. For example, you might declare the following section in
|
||||
`application.properties`:
|
||||
|
||||
[source,properties,indent=0]
|
||||
----
|
||||
spring.kafka.bootstrap-servers=localhost:9092
|
||||
spring.kafka.consumer.group-id=myGroup
|
||||
----
|
||||
|
||||
See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`]
|
||||
for more of the supported options.
|
||||
|
||||
|
||||
|
||||
=== Sending a Message
|
||||
|
||||
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own
|
||||
beans:
|
||||
|
||||
[source,java,indent=0]
|
||||
----
|
||||
@Component
|
||||
public class MyBean {
|
||||
|
||||
private final KafkaTemplate kafkaTemplate;
|
||||
|
||||
@Autowired
|
||||
public MyBean(KafkaTemplate kafkaTemplate) {
|
||||
this.kafkaTemplate = kafkaTemplate;
|
||||
}
|
||||
|
||||
// ...
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
|
||||
=== Receiving a Message
|
||||
|
||||
When the Apache Kafka infrastructure is present, any bean can be annotated with
|
||||
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory`
|
||||
has been defined, a default one is configured automatically with keys defined in
|
||||
`spring.kafka.listener.*`.
|
||||
|
||||
The following component creates a listener endpoint on the `someTopic` topic:
|
||||
|
||||
|
||||
[source,java,indent=0]
|
||||
----
|
||||
@Component
|
||||
public class MyBean {
|
||||
|
||||
@KafkaListener(topics = "someTopic")
|
||||
public void processMessage(String content) {
|
||||
// ...
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
|
||||
[[kafka-extra-props]]
|
||||
=== Additional Kafka Properties
|
||||
|
||||
The properties supported by auto configuration are shown in
|
||||
<<common-application-properties>>. Note that these properties (hyphenated or camelCase)
|
||||
map directly to the Apache Kafka dotted properties for the most part, refer to the Apache
|
||||
Kafka documentation for details.
|
||||
|
||||
The first few of these properties apply to both producers and consumers, but can be
|
||||
specified at the producer or consumer level if you wish to use different values for each.
|
||||
Apache Kafka designates properties with an importance: HIGH, MEDIUM and LOW. Spring Boot
|
||||
auto configuration supports all HIGH importance properties, some selected MEDIUM and LOW,
|
||||
and any that do not have a default value.
|
||||
|
||||
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
|
||||
class. If you wish to configure the producer or consumer with additional properties, you
|
||||
can override the producer factory and/or consumer factory bean, adding additional
|
||||
properties, for example:
|
||||
|
||||
[source,java,indent=0]
|
||||
----
|
||||
@Bean
|
||||
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
|
||||
Map<String, Object> producerProperties = properties.buildProducerProperties();
|
||||
producerProperties.put("some.property", "some.value");
|
||||
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
|
||||
[[boot-features-restclient]]
|
||||
== Calling REST services
|
||||
If you need to call remote REST services from your application, you can use Spring
|
||||
|
|
@ -4514,8 +4616,6 @@ Lastly, the most extreme (and rarely used) option is to create your own
|
|||
`RestTemplateBuilder` bean. This will switch off the auto-configuration of a
|
||||
`RestTemplateBuilder` and will prevent any `RestTemplateCustomizer` beans from being used.
|
||||
|
||||
|
||||
|
||||
[[boot-features-email]]
|
||||
== Sending email
|
||||
The Spring Framework provides an easy abstraction for sending email using the
|
||||
|
|
|
|||
Loading…
Reference in New Issue