diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 4b8a39b7d5b..b7d40d0195b 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -46,11 +46,13 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { * instance to configure * @param consumerFactory the {@link ConsumerFactory} to use */ - public void configure(ConcurrentKafkaListenerContainerFactory listenerContainerFactory, + public void configure( + ConcurrentKafkaListenerContainerFactory listenerContainerFactory, ConsumerFactory consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); - ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties(); + ContainerProperties containerProperties = listenerContainerFactory + .getContainerProperties(); if (container.getAckMode() != null) { containerProperties.setAckMode(container.getAckMode()); } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 05ae3ecd263..882119a60f2 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -54,13 +54,11 @@ class KafkaAnnotationDrivenConfiguration { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory kafkaConsumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory(); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); configurer.configure(factory, kafkaConsumerFactory); return factory; } - @EnableKafka @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableKafkaConfiguration { diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 993bc531d17..92fad052117 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -54,8 +54,8 @@ public class KafkaAutoConfiguration { public KafkaTemplate kafkaTemplate( ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener) { - KafkaTemplate kafkaTemplate = - new KafkaTemplate(kafkaProducerFactory); + KafkaTemplate kafkaTemplate = new KafkaTemplate( + kafkaProducerFactory); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e81133c0ec2..d091b3d6028 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -36,8 +36,8 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo /** * Configuration properties for Spring for Apache Kafka. - *

- * Users should refer to kafka documentation for complete descriptions of these + *

+ * Users should refer to Kafka documentation for complete descriptions of these * properties. * * @author Gary Russell @@ -63,8 +63,8 @@ public class KafkaProperties { * Comma-delimited list of host:port pairs to use for establishing the initial * connection to the Kafka cluster. */ - private List bootstrapServers = new ArrayList(Collections.singletonList( - "localhost:9092")); + private List bootstrapServers = new ArrayList( + Collections.singletonList("localhost:9092")); /** * Id to pass to the server when making requests; used for server-side logging. @@ -110,7 +110,8 @@ public class KafkaProperties { private Map buildCommonProperties() { Map properties = new HashMap(); if (this.bootstrapServers != null) { - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + this.bootstrapServers); } if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); @@ -139,10 +140,11 @@ public class KafkaProperties { /** * Create an initial map of consumer properties from the state of this instance. - *

This allows you to add additional properties, if necessary, and override the + *

+ * This allows you to add additional properties, if necessary, and override the * default kafkaConsumerFactory bean. - * @return the consumer properties initialized with the customizations defined on - * this instance + * @return the consumer properties initialized with the customizations defined on this + * instance */ public Map buildConsumerProperties() { Map props = buildCommonProperties(); @@ -152,10 +154,11 @@ public class KafkaProperties { /** * Create an initial map of producer properties from the state of this instance. - *

This allows you to add additional properties, if necessary, and override the + *

+ * This allows you to add additional properties, if necessary, and override the * default kafkaProducerFactory bean. - * @return the producer properties initialized with the customizations defined on - * this instance + * @return the producer properties initialized with the customizations defined on this + * instance */ public Map buildProducerProperties() { Map props = buildCommonProperties(); @@ -168,8 +171,9 @@ public class KafkaProperties { return resource.getFile().getAbsolutePath(); } catch (IOException ex) { - throw new IllegalStateException(String.format( - "Resource '%s' must be on a file system", resource), ex); + throw new IllegalStateException( + String.format("Resource '%s' must be on a file system", resource), + ex); } } @@ -178,8 +182,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); /** - * Frequency in milliseconds that the consumer offsets are auto-committed to - * Kafka if 'enable.auto.commit' true. + * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka + * if 'enable.auto.commit' true. */ private Long autoCommitInterval; @@ -332,22 +336,27 @@ public class KafkaProperties { public Map buildProperties() { Map properties = new HashMap(); if (this.autoCommitInterval != null) { - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + this.autoCommitInterval); } if (this.autoOffsetReset != null) { - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + this.autoOffsetReset); } if (this.bootstrapServers != null) { - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.bootstrapServers); } if (this.clientId != null) { properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); } if (this.enableAutoCommit != null) { - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + this.enableAutoCommit); } if (this.fetchMaxWait != null) { - properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait); + properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, + this.fetchMaxWait); } if (this.fetchMinSize != null) { properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); @@ -356,29 +365,36 @@ public class KafkaProperties { properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); } if (this.heartbeatInterval != null) { - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, + this.heartbeatInterval); } if (this.keyDeserializer != null) { - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + this.keyDeserializer); } if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, + this.ssl.getKeyPassword()); } if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation())); + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + resourceToPath(this.ssl.getKeystoreLocation())); } if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword()); + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + this.ssl.getKeystorePassword()); } if (this.ssl.getTruststoreLocation() != null) { properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getTruststoreLocation())); } if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); + properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + this.ssl.getTruststorePassword()); } if (this.valueDeserializer != null) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + this.valueDeserializer); } return properties; } @@ -407,8 +423,8 @@ public class KafkaProperties { private List bootstrapServers; /** - * Total bytes of memory the producer can use to buffer records waiting to be - * sent to the server. + * Total bytes of memory the producer can use to buffer records waiting to be sent + * to the server. */ private Long bufferMemory; @@ -522,7 +538,8 @@ public class KafkaProperties { properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); } if (this.bootstrapServers != null) { - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.bootstrapServers); } if (this.bufferMemory != null) { properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); @@ -531,32 +548,39 @@ public class KafkaProperties { properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); } if (this.compressionType != null) { - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, + this.compressionType); } if (this.keySerializer != null) { - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + this.keySerializer); } if (this.retries != null) { properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); } if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, + this.ssl.getKeyPassword()); } if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation())); + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + resourceToPath(this.ssl.getKeystoreLocation())); } if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword()); + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + this.ssl.getKeystorePassword()); } if (this.ssl.getTruststoreLocation() != null) { properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getTruststoreLocation())); } if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); + properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + this.ssl.getTruststorePassword()); } if (this.valueSerializer != null) { - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + this.valueSerializer); } return properties; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index 42f097f78be..2b9b06f77dc 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -44,8 +44,8 @@ public class KafkaAutoConfigurationIntegrationTests { private static final String TEST_TOPIC = "testTopic"; @ClassRule - public static final KafkaEmbedded kafkaEmbedded = - new KafkaEmbedded(1, true, TEST_TOPIC); + public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, + TEST_TOPIC); private AnnotationConfigApplicationContext context; @@ -63,7 +63,8 @@ public class KafkaAutoConfigurationIntegrationTests { "spring.kafka.consumer.group-id=testGroup", "spring.kafka.consumer.auto-offset-reset=earliest"); @SuppressWarnings("unchecked") - KafkaTemplate template = this.context.getBean(KafkaTemplate.class); + KafkaTemplate template = this.context + .getBean(KafkaTemplate.class); template.send(TEST_TOPIC, "foo", "bar"); Listener listener = this.context.getBean(Listener.class); assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index c86d8f01b78..a5240b6fda8 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -41,13 +41,11 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo import static org.assertj.core.api.Assertions.assertThat; - /** * Tests for {@link KafkaAutoConfiguration}. * * @author Gary Russell * @author Stephane Nicoll - * @since 1.5 */ public class KafkaAutoConfigurationTests { @@ -78,40 +76,50 @@ public class KafkaAutoConfigurationTests { "spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"); - DefaultKafkaConsumerFactory consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class); + DefaultKafkaConsumerFactory consumerFactory = this.context + .getBean(DefaultKafkaConsumerFactory.class); @SuppressWarnings("unchecked") - Map consumerProps = (Map) new DirectFieldAccessor(consumerFactory) - .getPropertyValue("configs"); + Map consumerProps = (Map) new DirectFieldAccessor( + consumerFactory).getPropertyValue("configs"); // common assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) .isEqualTo(Collections.singletonList("foo:1234")); assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "ksLoc"); - assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); + assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + .isEqualTo("p2"); assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "tsLoc"); - assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3"); + assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p3"); // consumer assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override - assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE); - assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L); - assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest"); - assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); - assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); + assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) + .isEqualTo(Boolean.FALSE); + assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) + .isEqualTo(123L); + assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + .isEqualTo("earliest"); + assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)) + .isEqualTo(456); + assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)) + .isEqualTo(789); assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); - assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234); - assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class); + assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)) + .isEqualTo(234); + assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + .isEqualTo(LongDeserializer.class); assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerDeserializer.class); } @Test public void producerProperties() { - load("spring.kafka.clientId=cid", - "spring.kafka.producer.acks=all", + load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20", - "spring.kafka.producer.bootstrap-servers=bar:1234", // test override common + "spring.kafka.producer.bootstrap-servers=bar:1234", // test override + // common "spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", @@ -122,10 +130,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", "spring.kafka.producer.ssl.truststore-password=p6", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"); - DefaultKafkaProducerFactory producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class); + DefaultKafkaProducerFactory producerFactory = this.context + .getBean(DefaultKafkaProducerFactory.class); @SuppressWarnings("unchecked") - Map producerProps = (Map) new DirectFieldAccessor(producerFactory) - .getPropertyValue("configs"); + Map producerProps = (Map) new DirectFieldAccessor( + producerFactory).getPropertyValue("configs"); // common assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); // producer @@ -133,18 +142,24 @@ public class KafkaAutoConfigurationTests { assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) .isEqualTo(Collections.singletonList("bar:1234")); // override - assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); - assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); - assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class); + assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)) + .isEqualTo(12345L); + assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)) + .isEqualTo("gzip"); + assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .isEqualTo(LongSerializer.class); assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "ksLocP"); - assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); + assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + .isEqualTo("p5"); assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "tsLocP"); - assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6"); + assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p6"); assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); - assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class); + assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + .isEqualTo(IntegerSerializer.class); } @Test @@ -156,21 +171,25 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000"); - DefaultKafkaProducerFactory producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class); - DefaultKafkaConsumerFactory consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class); + DefaultKafkaProducerFactory producerFactory = this.context + .getBean(DefaultKafkaProducerFactory.class); + DefaultKafkaConsumerFactory consumerFactory = this.context + .getBean(DefaultKafkaConsumerFactory.class); KafkaTemplate kafkaTemplate = this.context.getBean(KafkaTemplate.class); KafkaListenerContainerFactory kafkaListenerContainerFactory = this.context .getBean(KafkaListenerContainerFactory.class); - assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory")) - .isEqualTo(producerFactory); + assertThat(new DirectFieldAccessor(kafkaTemplate) + .getPropertyValue("producerFactory")).isEqualTo(producerFactory); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); - assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL); + assertThat(dfa.getPropertyValue("containerProperties.ackMode")) + .isEqualTo(AckMode.MANUAL); assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); - assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L); + assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) + .isEqualTo(2000L); } private void load(String... environment) {