Merge pull request #42120 from vpavic
* gh-42120: Improve Pulsar listener container concurrency configuration Closes gh-42120
This commit is contained in:
commit
7deb74af76
|
@ -69,7 +69,6 @@ import org.springframework.pulsar.transaction.PulsarTransactionManager;
|
|||
* @author Alexander Preuß
|
||||
* @author Phillip Webb
|
||||
* @author Jonas Geiregat
|
||||
* @author Vedran Pavic
|
||||
* @since 3.2.0
|
||||
*/
|
||||
@AutoConfiguration
|
||||
|
@ -188,10 +187,7 @@ public class PulsarAutoConfiguration {
|
|||
}
|
||||
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
|
||||
this.propertiesMapper.customizeContainerProperties(containerProperties);
|
||||
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
|
||||
pulsarConsumerFactory, containerProperties);
|
||||
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
|
||||
return listenerContainerFactory;
|
||||
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuil
|
|||
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.boot.json.JsonWriter;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
|
||||
|
@ -197,17 +196,10 @@ final class PulsarPropertiesMapper {
|
|||
PulsarProperties.Listener properties = this.properties.getListener();
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
|
||||
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
|
||||
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
|
||||
}
|
||||
|
||||
@SuppressWarnings("removal")
|
||||
<T> void customizeConcurrentPulsarListenerContainerFactory(
|
||||
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
|
||||
PulsarProperties.Listener properties = this.properties.getListener();
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
|
||||
}
|
||||
|
||||
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
|
||||
PulsarProperties.Reader properties = this.properties.getReader();
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
|
||||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.core.PulsarProducerFactory;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||
|
@ -264,6 +263,7 @@ class PulsarPropertiesMapperTests {
|
|||
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
|
||||
properties.getConsumer().getSubscription().setName("my-subscription");
|
||||
properties.getListener().setSchemaType(SchemaType.AVRO);
|
||||
properties.getListener().setConcurrency(10);
|
||||
properties.getListener().setObservationEnabled(true);
|
||||
properties.getTransaction().setEnabled(true);
|
||||
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
|
||||
|
@ -271,22 +271,11 @@ class PulsarPropertiesMapperTests {
|
|||
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
|
||||
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
|
||||
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
|
||||
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
|
||||
assertThat(containerProperties.isObservationEnabled()).isTrue();
|
||||
assertThat(containerProperties.transactions().isEnabled()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("removal")
|
||||
void customizeConcurrentPulsarListenerContainerFactory() {
|
||||
PulsarProperties properties = new PulsarProperties();
|
||||
properties.getListener().setConcurrency(10);
|
||||
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
|
||||
ConcurrentPulsarListenerContainerFactory.class);
|
||||
new PulsarPropertiesMapper(properties)
|
||||
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
|
||||
then(listenerContainerFactory).should().setConcurrency(10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void customizeReaderBuilder() {
|
||||
|
|
Loading…
Reference in New Issue