Add support for additional Kafka listener properties
See gh-11502
This commit is contained in:
parent
2a7b2f304a
commit
6fcbf80b31
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2017 the original author or authors.
|
* Copyright 2012-2018 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -101,6 +101,11 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||||
.to(container::setAckTime);
|
.to(container::setAckTime);
|
||||||
map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis)
|
map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis)
|
||||||
.to(container::setPollTimeout);
|
.to(container::setPollTimeout);
|
||||||
|
map.from(properties::getClientId).whenNonNull().to(container::setClientId);
|
||||||
|
map.from(properties::getIdleEventInterval).whenNonNull().to(container::setIdleEventInterval);
|
||||||
|
map.from(properties::getMonitorInterval).whenNonNull().to(container::setMonitorInterval);
|
||||||
|
map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold);
|
||||||
|
map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2017 the original author or authors.
|
* Copyright 2012-2018 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -818,6 +818,31 @@ public class KafkaProperties {
|
||||||
*/
|
*/
|
||||||
private Duration ackTime;
|
private Duration ackTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prefix for the listener's consumer client.id property.
|
||||||
|
*/
|
||||||
|
private String clientId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interval (ms) between publishing idle consumer events (no data received).
|
||||||
|
*/
|
||||||
|
private Long idleEventInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interval (seconds) between checks for non-responsive consumers.
|
||||||
|
*/
|
||||||
|
private Integer monitorInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multiplier applied to pollTimeout to determine if a consumer is non-responsive.
|
||||||
|
*/
|
||||||
|
private Float noPollThreshold;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When true, log the container configuration during initialization (INFO level).
|
||||||
|
*/
|
||||||
|
private Boolean logContainerConfig;
|
||||||
|
|
||||||
public Type getType() {
|
public Type getType() {
|
||||||
return this.type;
|
return this.type;
|
||||||
}
|
}
|
||||||
|
@ -866,6 +891,46 @@ public class KafkaProperties {
|
||||||
this.ackTime = ackTime;
|
this.ackTime = ackTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getClientId() {
|
||||||
|
return this.clientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClientId(String clientId) {
|
||||||
|
this.clientId = clientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getIdleEventInterval() {
|
||||||
|
return this.idleEventInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIdleEventInterval(Long idleEventInterval) {
|
||||||
|
this.idleEventInterval = idleEventInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMonitorInterval() {
|
||||||
|
return this.monitorInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMonitorInterval(Integer monitorInterval) {
|
||||||
|
this.monitorInterval = monitorInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Float getNoPollThreshold() {
|
||||||
|
return this.noPollThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNoPollThreshold(Float noPollThreshold) {
|
||||||
|
this.noPollThreshold = noPollThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getLogContainerConfig() {
|
||||||
|
return this.logContainerConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLogContainerConfig(Boolean logContainerConfig) {
|
||||||
|
this.logContainerConfig = logContainerConfig;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Ssl {
|
public static class Ssl {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2017 the original author or authors.
|
* Copyright 2012-2018 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -259,6 +259,11 @@ public class KafkaAutoConfigurationTests {
|
||||||
"spring.kafka.listener.concurrency=3",
|
"spring.kafka.listener.concurrency=3",
|
||||||
"spring.kafka.listener.poll-timeout=2000",
|
"spring.kafka.listener.poll-timeout=2000",
|
||||||
"spring.kafka.listener.type=batch",
|
"spring.kafka.listener.type=batch",
|
||||||
|
"spring.kafka.listener.client-id=client",
|
||||||
|
"spring.kafka.listener.idle-event-interval=12345",
|
||||||
|
"spring.kafka.listener.monitor-interval=45",
|
||||||
|
"spring.kafka.listener.no-poll-threshold=2.5",
|
||||||
|
"spring.kafka.listener.log-container-config=true",
|
||||||
"spring.kafka.jaas.enabled=true",
|
"spring.kafka.jaas.enabled=true",
|
||||||
"spring.kafka.producer.transaction-id-prefix=foo",
|
"spring.kafka.producer.transaction-id-prefix=foo",
|
||||||
"spring.kafka.jaas.login-module=foo",
|
"spring.kafka.jaas.login-module=foo",
|
||||||
|
@ -292,6 +297,16 @@ public class KafkaAutoConfigurationTests {
|
||||||
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
|
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
|
||||||
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
|
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
|
||||||
.isEqualTo(2000L);
|
.isEqualTo(2000L);
|
||||||
|
assertThat(dfa.getPropertyValue("containerProperties.clientId"))
|
||||||
|
.isEqualTo("client");
|
||||||
|
assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval"))
|
||||||
|
.isEqualTo(12345L);
|
||||||
|
assertThat(dfa.getPropertyValue("containerProperties.monitorInterval"))
|
||||||
|
.isEqualTo(45);
|
||||||
|
assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold"))
|
||||||
|
.isEqualTo(2.5f);
|
||||||
|
assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig"))
|
||||||
|
.isEqualTo(Boolean.TRUE);
|
||||||
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);
|
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);
|
||||||
assertThat(
|
assertThat(
|
||||||
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
|
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
|
||||||
|
|
|
@ -149,7 +149,7 @@
|
||||||
<spring-data-releasetrain.version>Kay-SR2</spring-data-releasetrain.version>
|
<spring-data-releasetrain.version>Kay-SR2</spring-data-releasetrain.version>
|
||||||
<spring-hateoas.version>0.24.0.RELEASE</spring-hateoas.version>
|
<spring-hateoas.version>0.24.0.RELEASE</spring-hateoas.version>
|
||||||
<spring-integration.version>5.0.1.BUILD-SNAPSHOT</spring-integration.version>
|
<spring-integration.version>5.0.1.BUILD-SNAPSHOT</spring-integration.version>
|
||||||
<spring-kafka.version>2.1.0.RELEASE</spring-kafka.version>
|
<spring-kafka.version>2.1.1.BUILD-SNAPSHOT</spring-kafka.version>
|
||||||
<spring-ldap.version>2.3.2.RELEASE</spring-ldap.version>
|
<spring-ldap.version>2.3.2.RELEASE</spring-ldap.version>
|
||||||
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
|
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
|
||||||
<spring-restdocs.version>2.0.0.RELEASE</spring-restdocs.version>
|
<spring-restdocs.version>2.0.0.RELEASE</spring-restdocs.version>
|
||||||
|
|
|
@ -987,7 +987,12 @@ content into your application. Rather, pick only the properties that you need.
|
||||||
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
|
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
|
||||||
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
|
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
|
||||||
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
|
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
|
||||||
|
spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
|
||||||
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
|
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
|
||||||
|
spring.kafka.listener.idle-event-interval= # Interval (ms) between publishing idle consumer events (no data received).
|
||||||
|
spring.kafka.listener.log-container-config= # When true, log the container configuration during initialization (INFO level).
|
||||||
|
spring.kafka.listener.monitor-interval= # Interval (seconds) between checks for non-responsive consumers.
|
||||||
|
spring.kafka.listener.no-poll-threshold= # Multiplier applied to pollTimeout to determine if a consumer is non-responsive.
|
||||||
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
|
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
|
||||||
spring.kafka.listener.type=single # Listener type.
|
spring.kafka.listener.type=single # Listener type.
|
||||||
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
|
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
|
||||||
|
|
Loading…
Reference in New Issue