Polish "Auto-configure Kafka listener container with rebalance listener"
Closes gh-16755
This commit is contained in:
parent
abdc2e1b4f
commit
74208bb1a7
|
@ -48,14 +48,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
|
||||
private KafkaAwareTransactionManager<Object, Object> transactionManager;
|
||||
|
||||
private ConsumerAwareRebalanceListener rebalanceListener;
|
||||
|
||||
private ErrorHandler errorHandler;
|
||||
|
||||
private BatchErrorHandler batchErrorHandler;
|
||||
|
||||
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
||||
|
||||
private ConsumerAwareRebalanceListener rebalanceListener;
|
||||
|
||||
/**
|
||||
* Set the {@link KafkaProperties} to use.
|
||||
* @param properties the properties
|
||||
|
@ -89,6 +89,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link ConsumerAwareRebalanceListener} to use.
|
||||
* @param rebalanceListener the rebalance listener.
|
||||
* @since 2.2
|
||||
*/
|
||||
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
|
||||
this.rebalanceListener = rebalanceListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link ErrorHandler} to use.
|
||||
* @param errorHandler the error handler
|
||||
|
@ -114,15 +123,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
this.afterRollbackProcessor = afterRollbackProcessor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link ConsumerAwareRebalanceListener} to use.
|
||||
* @param rebalanceListener the rebalance listener.
|
||||
* @since 2.2
|
||||
*/
|
||||
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
|
||||
this.rebalanceListener = rebalanceListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the specified Kafka listener container factory. The factory can be
|
||||
* further tuned and default settings can be overridden.
|
||||
|
|
|
@ -58,33 +58,33 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
|
||||
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
|
||||
|
||||
private final ConsumerAwareRebalanceListener rebalanceListener;
|
||||
|
||||
private final ErrorHandler errorHandler;
|
||||
|
||||
private final BatchErrorHandler batchErrorHandler;
|
||||
|
||||
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
|
||||
|
||||
private final ConsumerAwareRebalanceListener rebalanceListener;
|
||||
|
||||
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
|
||||
ObjectProvider<RecordMessageConverter> messageConverter,
|
||||
ObjectProvider<BatchMessageConverter> batchMessageConverter,
|
||||
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
|
||||
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
|
||||
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
|
||||
ObjectProvider<ErrorHandler> errorHandler,
|
||||
ObjectProvider<BatchErrorHandler> batchErrorHandler,
|
||||
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
|
||||
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener) {
|
||||
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
|
||||
this.properties = properties;
|
||||
this.messageConverter = messageConverter.getIfUnique();
|
||||
this.batchMessageConverter = batchMessageConverter.getIfUnique(
|
||||
() -> new BatchMessagingMessageConverter(this.messageConverter));
|
||||
this.kafkaTemplate = kafkaTemplate.getIfUnique();
|
||||
this.transactionManager = kafkaTransactionManager.getIfUnique();
|
||||
this.rebalanceListener = rebalanceListener.getIfUnique();
|
||||
this.errorHandler = errorHandler.getIfUnique();
|
||||
this.batchErrorHandler = batchErrorHandler.getIfUnique();
|
||||
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
|
||||
this.rebalanceListener = rebalanceListener.getIfUnique();
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -97,10 +97,10 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
configurer.setMessageConverter(messageConverterToUse);
|
||||
configurer.setReplyTemplate(this.kafkaTemplate);
|
||||
configurer.setTransactionManager(this.transactionManager);
|
||||
configurer.setRebalanceListener(this.rebalanceListener);
|
||||
configurer.setErrorHandler(this.errorHandler);
|
||||
configurer.setBatchErrorHandler(this.batchErrorHandler);
|
||||
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
|
||||
configurer.setRebalanceListener(this.rebalanceListener);
|
||||
return configurer;
|
||||
}
|
||||
|
||||
|
|
|
@ -767,8 +767,7 @@ public class KafkaAutoConfigurationTests {
|
|||
|
||||
@Bean
|
||||
public ConsumerAwareRebalanceListener rebalanceListener() {
|
||||
return new ConsumerAwareRebalanceListener() {
|
||||
};
|
||||
return mock(ConsumerAwareRebalanceListener.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic:
|
|||
----
|
||||
|
||||
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
|
||||
container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is
|
||||
defined, it is automatically associated to the default factory.
|
||||
container factory. Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or
|
||||
`ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the
|
||||
default factory.
|
||||
|
||||
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean
|
||||
is associated to the default factory. If only a `RecordMessageConverter` bean is present
|
||||
|
|
Loading…
Reference in New Issue