Merge pull request #17322 from garyrussell

* gh-17322:
  Auto-configure KLC with user-provided RecordInterceptor

Closes gh-17322
This commit is contained in:
Andy Wilkinson 2019-06-29 22:06:24 +01:00
commit 4c812b065d
3 changed files with 39 additions and 1 deletions

View File

@ -28,6 +28,7 @@ import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@ -56,6 +57,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor; private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private RecordInterceptor<Object, Object> recordInterceptor;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
* @param properties the properties * @param properties the properties
@ -121,6 +124,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.afterRollbackProcessor = afterRollbackProcessor; this.afterRollbackProcessor = afterRollbackProcessor;
} }
/**
* Set the {@link RecordInterceptor} to use.
* @param recordInterceptor the record interceptor.
*/
void setRecordInterceptor(RecordInterceptor<Object, Object> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}
/** /**
* Configure the specified Kafka listener container factory. The factory can be * Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden. * further tuned and default settings can be overridden.
@ -149,6 +160,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
factory.setErrorHandler(this.errorHandler); factory.setErrorHandler(this.errorHandler);
} }
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
} }
private void configureContainer(ContainerProperties container) { private void configureContainer(ContainerProperties container) {

View File

@ -31,6 +31,7 @@ import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
@ -65,6 +66,8 @@ class KafkaAnnotationDrivenConfiguration {
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor; private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private final RecordInterceptor<Object, Object> recordInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter, ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<BatchMessageConverter> batchMessageConverter, ObjectProvider<BatchMessageConverter> batchMessageConverter,
@ -72,7 +75,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager, ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler, ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) { ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfUnique(); this.messageConverter = messageConverter.getIfUnique();
this.batchMessageConverter = batchMessageConverter this.batchMessageConverter = batchMessageConverter
@ -83,6 +87,7 @@ class KafkaAnnotationDrivenConfiguration {
this.errorHandler = errorHandler.getIfUnique(); this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
} }
@Bean @Bean
@ -99,6 +104,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setErrorHandler(this.errorHandler); configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
return configurer; return configurer;
} }

View File

@ -58,6 +58,7 @@ import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@ -537,6 +538,15 @@ class KafkaAutoConfigurationTests {
}); });
} }
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRecordInterceptor() {
this.contextRunner.withUserConfiguration(RecordInterceptorConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordInterceptor", context.getBean("recordInterceptor"));
});
}
@Test @Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() { void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> { this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> {
@ -621,6 +631,16 @@ class KafkaAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
protected static class RecordInterceptorConfiguration {
@Bean
public RecordInterceptor<Object, Object> recordInterceptor() {
return (record) -> record;
}
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
protected static class RebalanceListenerConfiguration { protected static class RebalanceListenerConfiguration {