Merge pull request #27927 from garyrussell
* pr/27927: Polish "Auto-Configure Kafka CommonErrorHandler" Auto-Configure Kafka CommonErrorHandler Closes gh-27927
This commit is contained in:
commit
aa0329c24d
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2020 the original author or authors.
|
* Copyright 2012-2021 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
||||||
import org.springframework.kafka.listener.BatchErrorHandler;
|
import org.springframework.kafka.listener.BatchErrorHandler;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||||
import org.springframework.kafka.listener.ContainerProperties;
|
import org.springframework.kafka.listener.ContainerProperties;
|
||||||
import org.springframework.kafka.listener.ErrorHandler;
|
import org.springframework.kafka.listener.ErrorHandler;
|
||||||
|
|
@ -58,6 +59,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||||
|
|
||||||
private BatchErrorHandler batchErrorHandler;
|
private BatchErrorHandler batchErrorHandler;
|
||||||
|
|
||||||
|
private CommonErrorHandler commonErrorHandler;
|
||||||
|
|
||||||
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
||||||
|
|
||||||
private RecordInterceptor<Object, Object> recordInterceptor;
|
private RecordInterceptor<Object, Object> recordInterceptor;
|
||||||
|
|
@ -127,6 +130,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||||
this.batchErrorHandler = batchErrorHandler;
|
this.batchErrorHandler = batchErrorHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link CommonErrorHandler} to use.
|
||||||
|
* @param commonErrorHandler the error handler.
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
|
||||||
|
this.commonErrorHandler = commonErrorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the {@link AfterRollbackProcessor} to use.
|
* Set the {@link AfterRollbackProcessor} to use.
|
||||||
* @param afterRollbackProcessor the after rollback processor
|
* @param afterRollbackProcessor the after rollback processor
|
||||||
|
|
@ -171,6 +183,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||||
else {
|
else {
|
||||||
factory.setErrorHandler(this.errorHandler);
|
factory.setErrorHandler(this.errorHandler);
|
||||||
}
|
}
|
||||||
|
map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler);
|
||||||
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
|
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
|
||||||
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
|
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2020 the original author or authors.
|
* Copyright 2012-2021 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -30,6 +30,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
||||||
import org.springframework.kafka.listener.BatchErrorHandler;
|
import org.springframework.kafka.listener.BatchErrorHandler;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||||
import org.springframework.kafka.listener.ErrorHandler;
|
import org.springframework.kafka.listener.ErrorHandler;
|
||||||
import org.springframework.kafka.listener.RecordInterceptor;
|
import org.springframework.kafka.listener.RecordInterceptor;
|
||||||
|
|
@ -68,6 +69,8 @@ class KafkaAnnotationDrivenConfiguration {
|
||||||
|
|
||||||
private final BatchErrorHandler batchErrorHandler;
|
private final BatchErrorHandler batchErrorHandler;
|
||||||
|
|
||||||
|
private final CommonErrorHandler commonErrorHandler;
|
||||||
|
|
||||||
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
||||||
|
|
||||||
private final RecordInterceptor<Object, Object> recordInterceptor;
|
private final RecordInterceptor<Object, Object> recordInterceptor;
|
||||||
|
|
@ -79,7 +82,7 @@ class KafkaAnnotationDrivenConfiguration {
|
||||||
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
|
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
|
||||||
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
|
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
|
||||||
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
|
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
|
||||||
ObjectProvider<BatchErrorHandler> batchErrorHandler,
|
ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<CommonErrorHandler> commonErrorHandler,
|
||||||
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
|
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
|
||||||
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
|
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
|
|
@ -92,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration {
|
||||||
this.rebalanceListener = rebalanceListener.getIfUnique();
|
this.rebalanceListener = rebalanceListener.getIfUnique();
|
||||||
this.errorHandler = errorHandler.getIfUnique();
|
this.errorHandler = errorHandler.getIfUnique();
|
||||||
this.batchErrorHandler = batchErrorHandler.getIfUnique();
|
this.batchErrorHandler = batchErrorHandler.getIfUnique();
|
||||||
|
this.commonErrorHandler = commonErrorHandler.getIfUnique();
|
||||||
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
|
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
|
||||||
this.recordInterceptor = recordInterceptor.getIfUnique();
|
this.recordInterceptor = recordInterceptor.getIfUnique();
|
||||||
}
|
}
|
||||||
|
|
@ -110,6 +114,7 @@ class KafkaAnnotationDrivenConfiguration {
|
||||||
configurer.setRebalanceListener(this.rebalanceListener);
|
configurer.setRebalanceListener(this.rebalanceListener);
|
||||||
configurer.setErrorHandler(this.errorHandler);
|
configurer.setErrorHandler(this.errorHandler);
|
||||||
configurer.setBatchErrorHandler(this.batchErrorHandler);
|
configurer.setBatchErrorHandler(this.batchErrorHandler);
|
||||||
|
configurer.setCommonErrorHandler(this.commonErrorHandler);
|
||||||
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
|
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
|
||||||
configurer.setRecordInterceptor(this.recordInterceptor);
|
configurer.setRecordInterceptor(this.recordInterceptor);
|
||||||
return configurer;
|
return configurer;
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,7 @@ import org.springframework.kafka.core.KafkaAdmin;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
||||||
import org.springframework.kafka.listener.BatchErrorHandler;
|
import org.springframework.kafka.listener.BatchErrorHandler;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||||
import org.springframework.kafka.listener.ContainerProperties;
|
import org.springframework.kafka.listener.ContainerProperties;
|
||||||
import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
||||||
|
|
@ -546,6 +547,17 @@ class KafkaAutoConfigurationTests {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConcurrentKafkaListenerContainerFactoryWithCustomCommonErrorHandler() {
|
||||||
|
this.contextRunner.withBean("errorHandler", CommonErrorHandler.class, () -> mock(CommonErrorHandler.class))
|
||||||
|
.run((context) -> {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
||||||
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||||
|
assertThat(factory).hasFieldOrPropertyWithValue("commonErrorHandler",
|
||||||
|
context.getBean("errorHandler"));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
|
void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
|
||||||
this.contextRunner.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test").run((context) -> {
|
this.contextRunner.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test").run((context) -> {
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ include::{docs-java}/features/messaging/kafka/receiving/MyBean.java[]
|
||||||
----
|
----
|
||||||
|
|
||||||
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory.
|
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory.
|
||||||
Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
|
Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
|
||||||
|
|
||||||
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory.
|
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory.
|
||||||
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.
|
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue