Group Kafka back-off properties
Kafka back-off policy properties "delay", "max-delay", "multiplier", and "random-back-off" are now defined in a common "backoff" group: - spring.kafka.retry.topic.backoff.delay - spring.kafka.retry.topic.backoff.maxDelay - spring.kafka.retry.topic.backoff.multiplier - spring.kafka.retry.topic.backoff.random See gh-41335
This commit is contained in:
parent
d07fe47102
commit
14c9893371
|
|
@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
|
||||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
|
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
|
||||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
|
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff;
|
||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.boot.context.properties.PropertyMapper;
|
import org.springframework.boot.context.properties.PropertyMapper;
|
||||||
import org.springframework.boot.ssl.SslBundles;
|
import org.springframework.boot.ssl.SslBundles;
|
||||||
|
|
@ -186,7 +186,7 @@ public class KafkaAutoConfiguration {
|
||||||
.useSingleTopicForSameIntervals()
|
.useSingleTopicForSameIntervals()
|
||||||
.suffixTopicsWithIndexValues()
|
.suffixTopicsWithIndexValues()
|
||||||
.doNotAutoCreateRetryTopics();
|
.doNotAutoCreateRetryTopics();
|
||||||
setBackOffPolicy(builder, retryTopic);
|
setBackOffPolicy(builder, retryTopic.getBackoff());
|
||||||
return builder.create(kafkaTemplate);
|
return builder.create(kafkaTemplate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -214,15 +214,15 @@ public class KafkaAutoConfiguration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
|
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) {
|
||||||
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
|
long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0;
|
||||||
if (delay > 0) {
|
if (delay > 0) {
|
||||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||||
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
|
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
|
||||||
map.from(delay).to(backOffPolicy::delay);
|
map.from(delay).to(backOffPolicy::delay);
|
||||||
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
|
map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
|
||||||
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
|
map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier);
|
||||||
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
|
map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random);
|
||||||
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
|
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
|
||||||
import org.springframework.boot.context.properties.PropertyMapper;
|
import org.springframework.boot.context.properties.PropertyMapper;
|
||||||
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
|
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
|
||||||
import org.springframework.boot.convert.DurationUnit;
|
import org.springframework.boot.convert.DurationUnit;
|
||||||
|
|
@ -1585,6 +1586,82 @@ public class KafkaProperties {
|
||||||
this.attempts = attempts;
|
this.attempts = attempts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0")
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public Duration getDelay() {
|
||||||
|
return getBackoff().getDelay();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public void setDelay(Duration delay) {
|
||||||
|
getBackoff().setDelay(delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier",
|
||||||
|
since = "3.4.0")
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public double getMultiplier() {
|
||||||
|
return getBackoff().getMultiplier();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public void setMultiplier(double multiplier) {
|
||||||
|
getBackoff().setMultiplier(multiplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0")
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public Duration getMaxDelay() {
|
||||||
|
return getBackoff().getMaxDelay();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public void setMaxDelay(Duration maxDelay) {
|
||||||
|
getBackoff().setMaxDelay(maxDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random",
|
||||||
|
since = "3.4.0")
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public boolean isRandomBackOff() {
|
||||||
|
return getBackoff().isRandom();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||||
|
public void setRandomBackOff(boolean randomBackOff) {
|
||||||
|
getBackoff().setRandom(randomBackOff);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Backoff backoff = new Backoff();
|
||||||
|
|
||||||
|
public Backoff getBackoff() {
|
||||||
|
return this.backoff;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Backoff {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Canonical backoff period. Used as an initial value in the exponential
|
||||||
|
* case, and as a minimum value in the uniform case.
|
||||||
|
*/
|
||||||
|
private Duration delay = Duration.ofSeconds(1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multiplier to use for generating the next backoff delay.
|
||||||
|
*/
|
||||||
|
private double multiplier = 0.0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum wait between retries. If less than the delay then the default
|
||||||
|
* of 30 seconds is applied.
|
||||||
|
*/
|
||||||
|
private Duration maxDelay = Duration.ZERO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to have the backoff delays.
|
||||||
|
*/
|
||||||
|
private boolean random = false;
|
||||||
|
|
||||||
public Duration getDelay() {
|
public Duration getDelay() {
|
||||||
return this.delay;
|
return this.delay;
|
||||||
}
|
}
|
||||||
|
|
@ -1609,12 +1686,14 @@ public class KafkaProperties {
|
||||||
this.maxDelay = maxDelay;
|
this.maxDelay = maxDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRandomBackOff() {
|
public boolean isRandom() {
|
||||||
return this.randomBackOff;
|
return this.random;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRandom(boolean random) {
|
||||||
|
this.random = random;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRandomBackOff(boolean randomBackOff) {
|
|
||||||
this.randomBackOff = randomBackOff;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue