Add property to configure Spring Kafka's authExceptionRetryInterval

Closes gh-44199
This commit is contained in:
Moritz Halbritter 2025-02-19 09:14:57 +01:00
parent 8c132711e4
commit 53405a48c2
3 changed files with 25 additions and 2 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2024 the original author or authors.
* Copyright 2012-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -237,6 +237,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(properties::isImmediateStop).to(container::setStopImmediate);
map.from(properties::isObservationEnabled).to(container::setObservationEnabled);
map.from(properties::getAuthExceptionRetryInterval).to(container::setAuthExceptionRetryInterval);
map.from(this.transactionManager).to(container::setKafkaAwareTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);

View File

@ -1080,6 +1080,11 @@ public class KafkaProperties {
*/
private boolean observationEnabled;
/**
* Time between retries after authentication exceptions.
*/
private Duration authExceptionRetryInterval;
public Type getType() {
return this.type;
}
@ -1232,6 +1237,14 @@ public class KafkaProperties {
this.observationEnabled = observationEnabled;
}
public Duration getAuthExceptionRetryInterval() {
return this.authExceptionRetryInterval;
}
public void setAuthExceptionRetryInterval(Duration authExceptionRetryInterval) {
this.authExceptionRetryInterval = authExceptionRetryInterval;
}
}
public static class Ssl {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.time.Duration;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
@ -80,4 +81,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
assertThat(this.factory.getContainerProperties().getListenerTaskExecutor()).isEqualTo(executor);
}
@Test
void shouldApplyAuthExceptionRetryInterval() {
this.properties.getListener().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
this.configurer.configure(this.factory, this.consumerFactory);
assertThat(this.factory.getContainerProperties().getAuthExceptionRetryInterval())
.isEqualTo(Duration.ofSeconds(10));
}
}