Merge pull request #41335 from travisriegler
* gh-41335: Polish "Group Kafka back-off properties" Group Kafka back-off properties Closes gh-41335
This commit is contained in:
commit
0a3e799687
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
* Copyright 2012-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic.Backoff;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.boot.ssl.SslBundles;
|
||||
|
|
@ -186,7 +186,7 @@ public class KafkaAutoConfiguration {
|
|||
.useSingleTopicForSameIntervals()
|
||||
.suffixTopicsWithIndexValues()
|
||||
.doNotAutoCreateRetryTopics();
|
||||
setBackOffPolicy(builder, retryTopic);
|
||||
setBackOffPolicy(builder, retryTopic.getBackoff());
|
||||
return builder.create(kafkaTemplate);
|
||||
}
|
||||
|
||||
|
|
@ -214,15 +214,15 @@ public class KafkaAutoConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
|
||||
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
|
||||
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Backoff retryTopicBackoff) {
|
||||
long delay = (retryTopicBackoff.getDelay() != null) ? retryTopicBackoff.getDelay().toMillis() : 0;
|
||||
if (delay > 0) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
|
||||
map.from(delay).to(backOffPolicy::delay);
|
||||
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
|
||||
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
|
||||
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
|
||||
map.from(retryTopicBackoff.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
|
||||
map.from(retryTopicBackoff.getMultiplier()).to(backOffPolicy::multiplier);
|
||||
map.from(retryTopicBackoff.isRandom()).to(backOffPolicy::random);
|
||||
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
|
||||
}
|
||||
else {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
|||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
|
||||
import org.springframework.boot.convert.DurationUnit;
|
||||
|
|
@ -1547,28 +1548,6 @@ public class KafkaProperties {
|
|||
*/
|
||||
private int attempts = 3;
|
||||
|
||||
/**
|
||||
* Canonical backoff period. Used as an initial value in the exponential case,
|
||||
* and as a minimum value in the uniform case.
|
||||
*/
|
||||
private Duration delay = Duration.ofSeconds(1);
|
||||
|
||||
/**
|
||||
* Multiplier to use for generating the next backoff delay.
|
||||
*/
|
||||
private double multiplier = 0.0;
|
||||
|
||||
/**
|
||||
* Maximum wait between retries. If less than the delay then the default of 30
|
||||
* seconds is applied.
|
||||
*/
|
||||
private Duration maxDelay = Duration.ZERO;
|
||||
|
||||
/**
|
||||
* Whether to have the backoff delays.
|
||||
*/
|
||||
private boolean randomBackOff = false;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
|
@ -1585,36 +1564,113 @@ public class KafkaProperties {
|
|||
this.attempts = attempts;
|
||||
}
|
||||
|
||||
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.delay", since = "3.4.0")
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public Duration getDelay() {
|
||||
return this.delay;
|
||||
return getBackoff().getDelay();
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public void setDelay(Duration delay) {
|
||||
this.delay = delay;
|
||||
getBackoff().setDelay(delay);
|
||||
}
|
||||
|
||||
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.multiplier",
|
||||
since = "3.4.0")
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public double getMultiplier() {
|
||||
return this.multiplier;
|
||||
return getBackoff().getMultiplier();
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public void setMultiplier(double multiplier) {
|
||||
this.multiplier = multiplier;
|
||||
getBackoff().setMultiplier(multiplier);
|
||||
}
|
||||
|
||||
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.maxDelay", since = "3.4.0")
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public Duration getMaxDelay() {
|
||||
return this.maxDelay;
|
||||
return getBackoff().getMaxDelay();
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public void setMaxDelay(Duration maxDelay) {
|
||||
this.maxDelay = maxDelay;
|
||||
getBackoff().setMaxDelay(maxDelay);
|
||||
}
|
||||
|
||||
@DeprecatedConfigurationProperty(replacement = "spring.kafka.retry.topic.backoff.random", since = "3.4.0")
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public boolean isRandomBackOff() {
|
||||
return this.randomBackOff;
|
||||
return getBackoff().isRandom();
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
public void setRandomBackOff(boolean randomBackOff) {
|
||||
this.randomBackOff = randomBackOff;
|
||||
getBackoff().setRandom(randomBackOff);
|
||||
}
|
||||
|
||||
private final Backoff backoff = new Backoff();
|
||||
|
||||
public Backoff getBackoff() {
|
||||
return this.backoff;
|
||||
}
|
||||
|
||||
public static class Backoff {
|
||||
|
||||
/**
|
||||
* Canonical backoff period. Used as an initial value in the exponential
|
||||
* case, and as a minimum value in the uniform case.
|
||||
*/
|
||||
private Duration delay = Duration.ofSeconds(1);
|
||||
|
||||
/**
|
||||
* Multiplier to use for generating the next backoff delay.
|
||||
*/
|
||||
private double multiplier = 0.0;
|
||||
|
||||
/**
|
||||
* Maximum wait between retries. If less than the delay then the default
|
||||
* of 30 seconds is applied.
|
||||
*/
|
||||
private Duration maxDelay = Duration.ZERO;
|
||||
|
||||
/**
|
||||
* Whether to have the backoff delays.
|
||||
*/
|
||||
private boolean random = false;
|
||||
|
||||
public Duration getDelay() {
|
||||
return this.delay;
|
||||
}
|
||||
|
||||
public void setDelay(Duration delay) {
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
public double getMultiplier() {
|
||||
return this.multiplier;
|
||||
}
|
||||
|
||||
public void setMultiplier(double multiplier) {
|
||||
this.multiplier = multiplier;
|
||||
}
|
||||
|
||||
public Duration getMaxDelay() {
|
||||
return this.maxDelay;
|
||||
}
|
||||
|
||||
public void setMaxDelay(Duration maxDelay) {
|
||||
this.maxDelay = maxDelay;
|
||||
}
|
||||
|
||||
public boolean isRandom() {
|
||||
return this.random;
|
||||
}
|
||||
|
||||
public void setRandom(boolean random) {
|
||||
this.random = random;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -442,6 +442,22 @@ class KafkaAutoConfigurationTests {
|
|||
|
||||
@Test
|
||||
void retryTopicConfigurationWithExponentialBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.backoff.delay=100ms",
|
||||
"spring.kafka.retry.topic.backoff.multiplier=2", "spring.kafka.retry.topic.backoff.max-delay=300ms")
|
||||
.run((context) -> {
|
||||
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
|
||||
assertThat(configuration.getDestinationTopicProperties()).hasSize(5)
|
||||
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
|
||||
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"),
|
||||
tuple(300L, "-retry-2"), tuple(0L, "-dlt"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
void retryTopicConfigurationWithExponentialBackOffUsingDeprecatedProperties() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
|
||||
|
|
@ -471,6 +487,18 @@ class KafkaAutoConfigurationTests {
|
|||
|
||||
@Test
|
||||
void retryTopicConfigurationWithFixedBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=2s")
|
||||
.run(assertRetryTopicConfiguration(
|
||||
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
|
||||
.extracting(DestinationTopic.Properties::delay)
|
||||
.containsExactly(0L, 2000L, 0L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
void retryTopicConfigurationWithFixedBackOffUsingDeprecatedProperties() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s")
|
||||
|
|
@ -482,6 +510,18 @@ class KafkaAutoConfigurationTests {
|
|||
|
||||
@Test
|
||||
void retryTopicConfigurationWithNoBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.backoff.delay=0")
|
||||
.run(assertRetryTopicConfiguration(
|
||||
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
|
||||
.extracting(DestinationTopic.Properties::delay)
|
||||
.containsExactly(0L, 0L, 0L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated(since = "3.4.0", forRemoval = true)
|
||||
void retryTopicConfigurationWithNoBackOffUsingDeprecatedProperties() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
|
||||
|
|
|
|||
Loading…
Reference in New Issue