Start building against Spring Kafka 3.0.3 snapshots
See gh-34153
This commit is contained in:
parent
cd43b4e805
commit
eda59f71b0
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -29,7 +29,8 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
|||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.RecordInterceptor;
|
||||
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
||||
import org.springframework.kafka.support.converter.MessageConverter;
|
||||
import org.springframework.kafka.support.converter.BatchMessageConverter;
|
||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
|
||||
/**
|
||||
|
@ -43,7 +44,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
|
||||
private KafkaProperties properties;
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
private BatchMessageConverter batchMessageConverter;
|
||||
|
||||
private RecordMessageConverter recordMessageConverter;
|
||||
|
||||
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
|
||||
|
||||
|
@ -68,11 +71,19 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the {@link MessageConverter} to use.
|
||||
* @param messageConverter the message converter
|
||||
* Set the {@link BatchMessageConverter} to use.
|
||||
* @param batchMessageConverter the message converter
|
||||
*/
|
||||
void setMessageConverter(MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
|
||||
this.batchMessageConverter = batchMessageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link RecordMessageConverter} to use.
|
||||
* @param recordMessageConverter the message converter
|
||||
*/
|
||||
void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
|
||||
this.recordMessageConverter = recordMessageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -151,7 +162,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
Listener properties = this.properties.getListener();
|
||||
map.from(properties::getConcurrency).to(factory::setConcurrency);
|
||||
map.from(this.messageConverter).to(factory::setMessageConverter);
|
||||
map.from(this.batchMessageConverter).to(factory::setBatchMessageConverter);
|
||||
map.from(this.recordMessageConverter).to(factory::setRecordMessageConverter);
|
||||
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
|
||||
map.from(this.replyTemplate).to(factory::setReplyTemplate);
|
||||
if (properties.getType().equals(Listener.Type.BATCH)) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.kafka;
|
|||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
|
@ -35,7 +34,6 @@ import org.springframework.kafka.listener.RecordInterceptor;
|
|||
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
||||
import org.springframework.kafka.support.converter.BatchMessageConverter;
|
||||
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
|
||||
import org.springframework.kafka.support.converter.MessageConverter;
|
||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
|
||||
|
@ -51,7 +49,7 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
|
||||
private final KafkaProperties properties;
|
||||
|
||||
private final RecordMessageConverter messageConverter;
|
||||
private final RecordMessageConverter recordMessageConverter;
|
||||
|
||||
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
|
||||
|
||||
|
@ -70,7 +68,7 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
private final RecordInterceptor<Object, Object> recordInterceptor;
|
||||
|
||||
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
|
||||
ObjectProvider<RecordMessageConverter> messageConverter,
|
||||
ObjectProvider<RecordMessageConverter> recordMessageConverter,
|
||||
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
|
||||
ObjectProvider<BatchMessageConverter> batchMessageConverter,
|
||||
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
|
||||
|
@ -80,10 +78,10 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
|
||||
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
|
||||
this.properties = properties;
|
||||
this.messageConverter = messageConverter.getIfUnique();
|
||||
this.recordMessageConverter = recordMessageConverter.getIfUnique();
|
||||
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
|
||||
this.batchMessageConverter = batchMessageConverter
|
||||
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
|
||||
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
|
||||
this.kafkaTemplate = kafkaTemplate.getIfUnique();
|
||||
this.transactionManager = kafkaTransactionManager.getIfUnique();
|
||||
this.rebalanceListener = rebalanceListener.getIfUnique();
|
||||
|
@ -97,9 +95,8 @@ class KafkaAnnotationDrivenConfiguration {
|
|||
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
|
||||
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
|
||||
configurer.setKafkaProperties(this.properties);
|
||||
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
|
||||
? this.batchMessageConverter : this.messageConverter;
|
||||
configurer.setMessageConverter(messageConverterToUse);
|
||||
configurer.setBatchMessageConverter(this.batchMessageConverter);
|
||||
configurer.setRecordMessageConverter(this.recordMessageConverter);
|
||||
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
|
||||
configurer.setReplyTemplate(this.kafkaTemplate);
|
||||
configurer.setTransactionManager(this.transactionManager);
|
||||
|
|
|
@ -519,7 +519,7 @@ class KafkaAutoConfigurationTests {
|
|||
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class).run((context) -> {
|
||||
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
|
||||
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
|
||||
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("recordMessageConverter",
|
||||
context.getBean("myMessageConverter"));
|
||||
});
|
||||
}
|
||||
|
@ -531,7 +531,7 @@ class KafkaAutoConfigurationTests {
|
|||
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
||||
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
|
||||
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
|
||||
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("batchMessageConverter",
|
||||
context.getBean("myBatchMessageConverter"));
|
||||
});
|
||||
}
|
||||
|
@ -543,7 +543,7 @@ class KafkaAutoConfigurationTests {
|
|||
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
|
||||
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
|
||||
"messageConverter");
|
||||
"batchMessageConverter");
|
||||
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
|
||||
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter())
|
||||
.isSameAs(context.getBean("myMessageConverter"));
|
||||
|
@ -555,7 +555,8 @@ class KafkaAutoConfigurationTests {
|
|||
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
||||
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
|
||||
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
||||
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory, "messageConverter");
|
||||
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
|
||||
"batchMessageConverter");
|
||||
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
|
||||
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter()).isNull();
|
||||
});
|
||||
|
|
|
@ -1384,7 +1384,7 @@ bom {
|
|||
]
|
||||
}
|
||||
}
|
||||
library("Spring Kafka", "3.0.2") {
|
||||
library("Spring Kafka", "3.0.3-SNAPSHOT") {
|
||||
group("org.springframework.kafka") {
|
||||
modules = [
|
||||
"spring-kafka",
|
||||
|
|
Loading…
Reference in New Issue