Fix StreamsBuilderFactoryBean injection definition

Closes gh-16329
This commit is contained in:
Stephane Nicoll 2019-03-27 10:17:49 +01:00
parent 4900505425
commit c2f9e7dd43
2 changed files with 45 additions and 3 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2018 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -68,7 +69,7 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2018 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -47,6 +47,7 @@ import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
@ -68,6 +69,8 @@ import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link KafkaAutoConfiguration}.
@ -401,6 +404,29 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void streamsWithSeveralStreamsBuilderFactoryBeans() {
this.contextRunner
.withUserConfiguration(EnableKafkaStreamsConfiguration.class,
TestStreamsBuilderFactoryBeanConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false")
.run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
assertThat(configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo("localhost:9092, localhost:9093");
verify(context.getBean("&firstStreamsBuilderFactoryBean",
StreamsBuilderFactoryBean.class), never())
.setAutoStartup(false);
verify(context.getBean("&secondStreamsBuilderFactoryBean",
StreamsBuilderFactoryBean.class), never())
.setAutoStartup(false);
});
}
@Test
public void streamsApplicationIdIsMandatory() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
@ -639,4 +665,19 @@ public class KafkaAutoConfigurationTests {
}
@Configuration
protected static class TestStreamsBuilderFactoryBeanConfiguration {
@Bean
public StreamsBuilderFactoryBean firstStreamsBuilderFactoryBean() {
return mock(StreamsBuilderFactoryBean.class);
}
@Bean
public StreamsBuilderFactoryBean secondStreamsBuilderFactoryBean() {
return mock(StreamsBuilderFactoryBean.class);
}
}
}