Add auto-configuration to Kafka Retry Topics
See gh-29812
This commit is contained in:
parent
de17878bf4
commit
bf46d7244a
|
|
@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
|
@ -33,13 +34,18 @@ import org.springframework.kafka.core.ConsumerFactory;
|
|||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaAdmin;
|
||||
import org.springframework.kafka.core.KafkaOperations;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
|
||||
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.retry.backoff.BackOffPolicyBuilder;
|
||||
import org.springframework.retry.backoff.SleepingBackOffPolicy;
|
||||
|
||||
/**
|
||||
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
|
||||
|
|
@ -48,6 +54,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager;
|
|||
* @author Stephane Nicoll
|
||||
* @author Eddú Meléndez
|
||||
* @author Nakul Mishra
|
||||
* @author Tomaz Fernandes
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@AutoConfiguration
|
||||
|
|
@ -137,4 +144,23 @@ public class KafkaAutoConfiguration {
|
|||
return kafkaAdmin;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
|
||||
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
|
||||
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
|
||||
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
|
||||
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
|
||||
.doNotAutoCreateRetryTopics();
|
||||
setBackOffPolicy(builder, retryTopic);
|
||||
return builder.create(kafkaOperations);
|
||||
}
|
||||
|
||||
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
|
||||
PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff);
|
||||
PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0)
|
||||
.toCall(() -> builder.customBackoff((SleepingBackOffPolicy<?>) BackOffPolicyBuilder.newBuilder()
|
||||
.delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis())
|
||||
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -41,6 +42,7 @@ import org.springframework.boot.convert.DurationUnit;
|
|||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.unit.DataSize;
|
||||
|
||||
|
|
@ -54,6 +56,7 @@ import org.springframework.util.unit.DataSize;
|
|||
* @author Stephane Nicoll
|
||||
* @author Artem Bilan
|
||||
* @author Nakul Mishra
|
||||
* @author Tomaz Fernandes
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.kafka")
|
||||
|
|
@ -94,6 +97,8 @@ public class KafkaProperties {
|
|||
|
||||
private final Security security = new Security();
|
||||
|
||||
private final Retry retry = new Retry();
|
||||
|
||||
public List<String> getBootstrapServers() {
|
||||
return this.bootstrapServers;
|
||||
}
|
||||
|
|
@ -150,6 +155,10 @@ public class KafkaProperties {
|
|||
return this.security;
|
||||
}
|
||||
|
||||
public Retry getRetry() {
|
||||
return this.retry;
|
||||
}
|
||||
|
||||
private Map<String, Object> buildCommonProperties() {
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
if (this.bootstrapServers != null) {
|
||||
|
|
@ -1332,6 +1341,140 @@ public class KafkaProperties {
|
|||
|
||||
}
|
||||
|
||||
public static class Retry {
|
||||
|
||||
private Topic topic = new Topic();
|
||||
|
||||
public Topic getTopic() {
|
||||
return this.topic.validate();
|
||||
}
|
||||
|
||||
public void setTopic(Topic topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Properties for non-blocking, topic-based retries.
|
||||
*/
|
||||
public static class Topic {
|
||||
|
||||
private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic.";
|
||||
|
||||
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX
|
||||
+ "%s should be greater than or equal to %s. Provided value was %s.";
|
||||
|
||||
/**
|
||||
* Whether to enable topic-based retries auto-configuration.
|
||||
*/
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* The total number of processing attempts made before sending the message to
|
||||
* the DLT.
|
||||
*/
|
||||
private Integer attempts = 3;
|
||||
|
||||
/**
|
||||
* A canonical backoff period. Used as an initial value in the exponential
|
||||
* case, and as a minimum value in the uniform case.
|
||||
*/
|
||||
private Duration delay = Duration.ofSeconds(1);
|
||||
|
||||
/**
|
||||
* If positive, then used as a multiplier for generating the next delay for
|
||||
* backoff.
|
||||
*/
|
||||
private Double multiplier = 0.0;
|
||||
|
||||
/**
|
||||
* The maximum wait between retries. If less than the delay then the default
|
||||
* of 30 seconds is applied.
|
||||
*/
|
||||
private Duration maxDelay = Duration.ZERO;
|
||||
|
||||
/**
|
||||
* In the exponential case, set this to true to have the backoff delays
|
||||
* randomized.
|
||||
*/
|
||||
private Boolean randomBackOff = false;
|
||||
|
||||
public Boolean getEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(Boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public Integer getAttempts() {
|
||||
return this.attempts;
|
||||
}
|
||||
|
||||
public void setAttempts(Integer attempts) {
|
||||
this.attempts = attempts;
|
||||
}
|
||||
|
||||
public Duration getDelay() {
|
||||
return this.delay;
|
||||
}
|
||||
|
||||
public Long getDelayMillis() {
|
||||
return (this.delay != null) ? this.delay.toMillis() : null;
|
||||
}
|
||||
|
||||
public void setDelay(Duration delay) {
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
public Double getMultiplier() {
|
||||
return this.multiplier;
|
||||
}
|
||||
|
||||
public void setMultiplier(Double multiplier) {
|
||||
this.multiplier = multiplier;
|
||||
}
|
||||
|
||||
public Duration getMaxDelay() {
|
||||
return this.maxDelay;
|
||||
}
|
||||
|
||||
public void setMaxDelay(Duration maxDelay) {
|
||||
this.maxDelay = maxDelay;
|
||||
}
|
||||
|
||||
public Long getMaxDelayMillis() {
|
||||
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
|
||||
}
|
||||
|
||||
public Boolean isRandomBackOff() {
|
||||
return this.randomBackOff;
|
||||
}
|
||||
|
||||
public void setRandomBackOff(Boolean randomBackOff) {
|
||||
this.randomBackOff = randomBackOff;
|
||||
}
|
||||
|
||||
private Topic validate() {
|
||||
validateProperty("attempts", this.attempts, 1);
|
||||
validateProperty("delay", this.getDelayMillis(), 0);
|
||||
validateProperty("multiplier", this.multiplier, 0);
|
||||
validateProperty("maxDelay", this.getMaxDelayMillis(), 0);
|
||||
Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(),
|
||||
"Property " + RETRY_TOPIC_PROPERTIES_PREFIX
|
||||
+ "randomBackOff should not be true with non-exponential back offs.");
|
||||
return this;
|
||||
}
|
||||
|
||||
private static void validateProperty(String propertyName, Number providedValue, int minValue) {
|
||||
Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null.");
|
||||
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
|
||||
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Security {
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package org.springframework.boot.autoconfigure.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
@ -41,6 +43,8 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
|||
import org.springframework.kafka.config.TopicBuilder;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.retrytopic.DestinationTopic;
|
||||
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
|
|
@ -53,12 +57,14 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||
*
|
||||
* @author Gary Russell
|
||||
* @author Stephane Nicoll
|
||||
* @author Tomaz Fernandes
|
||||
*/
|
||||
@DisabledOnOs(OS.WINDOWS)
|
||||
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
|
||||
class KafkaAutoConfigurationIntegrationTests {
|
||||
|
||||
static final String TEST_TOPIC = "testTopic";
|
||||
static final String TEST_RETRY_TOPIC = "testRetryTopic";
|
||||
|
||||
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
|
||||
|
||||
|
|
@ -89,6 +95,27 @@ class KafkaAutoConfigurationIntegrationTests {
|
|||
producer.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void testEndToEndWithRetryTopics() throws Exception {
|
||||
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
|
||||
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
|
||||
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
|
||||
"spring.kafka.consumer.auto-offset-reset=earliest");
|
||||
RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class);
|
||||
assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay)
|
||||
.containsExactly(0L, 100L, 200L, 300L, 300L, 0L);
|
||||
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
|
||||
template.send(TEST_RETRY_TOPIC, "foo", "bar");
|
||||
RetryListener listener = this.context.getBean(RetryListener.class);
|
||||
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo",
|
||||
"bar");
|
||||
assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic",
|
||||
"testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStreams() {
|
||||
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
|
||||
|
|
@ -121,6 +148,11 @@ class KafkaAutoConfigurationIntegrationTests {
|
|||
return new Listener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
RetryListener retryListener() {
|
||||
return new RetryListener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
NewTopic adminCreated() {
|
||||
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
|
||||
|
|
@ -157,4 +189,38 @@ class KafkaAutoConfigurationIntegrationTests {
|
|||
|
||||
}
|
||||
|
||||
static class RetryListener {
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(5);
|
||||
|
||||
private final List<String> topics = new ArrayList<>();
|
||||
|
||||
private volatile String received;
|
||||
|
||||
private volatile String key;
|
||||
|
||||
@KafkaListener(topics = TEST_RETRY_TOPIC)
|
||||
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
|
||||
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
|
||||
this.received = foo;
|
||||
this.key = key;
|
||||
this.topics.add(topic);
|
||||
this.latch.countDown();
|
||||
throw new RuntimeException("Test exception");
|
||||
}
|
||||
|
||||
private List<String> getTopics() {
|
||||
return this.topics;
|
||||
}
|
||||
|
||||
private String getReceived() {
|
||||
return this.received;
|
||||
}
|
||||
|
||||
private String getKey() {
|
||||
return this.key;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,6 +65,8 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
|||
import org.springframework.kafka.listener.ErrorHandler;
|
||||
import org.springframework.kafka.listener.RecordInterceptor;
|
||||
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
||||
import org.springframework.kafka.retrytopic.DestinationTopic;
|
||||
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
import org.springframework.kafka.support.converter.BatchMessageConverter;
|
||||
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
|
||||
|
|
@ -76,6 +78,7 @@ import org.springframework.test.util.ReflectionTestUtils;
|
|||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.entry;
|
||||
import static org.assertj.core.api.Assertions.tuple;
|
||||
import static org.mockito.BDDMockito.then;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
|
@ -87,6 +90,7 @@ import static org.mockito.Mockito.never;
|
|||
* @author Stephane Nicoll
|
||||
* @author Eddú Meléndez
|
||||
* @author Nakul Mishra
|
||||
* @author Tomaz Fernandes
|
||||
*/
|
||||
class KafkaAutoConfigurationTests {
|
||||
|
||||
|
|
@ -317,6 +321,113 @@ class KafkaAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithExponentialBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
|
||||
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> {
|
||||
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
|
||||
assertThat(configuration.getDestinationTopicProperties()).hasSize(6)
|
||||
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
|
||||
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"),
|
||||
tuple(300L, "-retry-2"), tuple(300L, "-retry-3"), tuple(0L, "-dlt"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithDefaultProperties() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true")
|
||||
.run((context) -> {
|
||||
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
|
||||
assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
|
||||
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
|
||||
.containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt"));
|
||||
assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics")
|
||||
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithFixedBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s")
|
||||
.run((context) -> assertThat(
|
||||
context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3)
|
||||
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithNoBackOff() {
|
||||
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
|
||||
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
|
||||
.run((context) -> assertThat(
|
||||
context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3)
|
||||
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithNegativeDelay() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
|
||||
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.delay=-1")
|
||||
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
|
||||
.isInstanceOf(IllegalArgumentException.class).message()
|
||||
.isEqualTo("Property spring.kafka.retry.topic.delay"
|
||||
+ " should be greater than or equal to 0. Provided value was -1."));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithNegativeMultiplier() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
|
||||
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.multiplier=-1")
|
||||
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
|
||||
.isInstanceOf(IllegalArgumentException.class).message()
|
||||
.isEqualTo("Property spring.kafka.retry.topic.multiplier"
|
||||
+ " should be greater than or equal to 0. Provided value was -1.0."));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithNegativeMaxDelay() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
|
||||
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.maxDelay=-1")
|
||||
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
|
||||
.isInstanceOf(IllegalArgumentException.class).message()
|
||||
.isEqualTo("Property spring.kafka.retry.topic.maxDelay"
|
||||
+ " should be greater than or equal to 0. Provided value was -1."));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithZeroAttempts() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
|
||||
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=0")
|
||||
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
|
||||
.isInstanceOf(IllegalArgumentException.class).message()
|
||||
.isEqualTo("Property spring.kafka.retry.topic.attempts"
|
||||
+ " should be greater than or equal to 1. Provided value was 0."));
|
||||
}
|
||||
|
||||
@Test
|
||||
void retryTopicConfigurationWithZeroMultiplierAndRandomBackOff() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.application.name=my-test-app",
|
||||
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
|
||||
"spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.randomBackOff=true")
|
||||
.run((context) -> assertThat(context.getStartupFailure()).getRootCause()
|
||||
.isInstanceOf(IllegalArgumentException.class).message().isEqualTo(
|
||||
"Property spring.kafka.retry.topic.randomBackOff should not be true with non-exponential back offs."));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void streamsWithSeveralStreamsBuilderFactoryBeans() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue