From 28474aa30adebc397ecdb993d62317e5659fce4a Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 21 Dec 2016 14:58:07 -0800 Subject: [PATCH 1/2] Fix compatibility with Apache Kafka 0.10.1 Update KafkaProperties since Apache Kafka `0.10.1` changed the type for the `ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG` from the `Long` to `Integer`. Kafka includes the following conversion logic: case LONG: if (value instanceof Integer) return ((Integer) value).longValue(); if (value instanceof Long) return (Long) value; else if (value instanceof String) return Long.parseLong(trimmed); So we remain compatible with both `0.10.0` and `0.10.1` Closes gh-7723 --- .../boot/autoconfigure/kafka/KafkaProperties.java | 7 ++++--- .../autoconfigure/kafka/KafkaAutoConfigurationTests.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index d4e07744006..356c9fbcd51 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -43,6 +43,7 @@ import org.springframework.util.CollectionUtils; * * @author Gary Russell * @author Stephane Nicoll + * @author Artem Bilan * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -199,7 +200,7 @@ public class KafkaProperties { * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka * if 'enable.auto.commit' true. */ - private Long autoCommitInterval; + private Integer autoCommitInterval; /** * What to do when there is no initial offset in Kafka or if the current offset @@ -264,11 +265,11 @@ public class KafkaProperties { return this.ssl; } - public Long getAutoCommitInterval() { + public Integer getAutoCommitInterval() { return this.autoCommitInterval; } - public void setAutoCommitInterval(Long autoCommitInterval) { + public void setAutoCommitInterval(Integer autoCommitInterval) { this.autoCommitInterval = autoCommitInterval; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index dc27e38da62..5dbb753df67 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -100,7 +100,7 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) .isEqualTo(Boolean.FALSE); assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) - .isEqualTo(123L); + .isEqualTo(123); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) .isEqualTo("earliest"); assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); From c700cf28cce44c33ed38de5012f271d349f788bc Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 21 Dec 2016 15:00:39 -0800 Subject: [PATCH 2/2] Fix typo in Kafka sample --- .../kafka/KafkaSpecialProducerConsumerConfigExample.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java index 23a7aa139d6..93c1f84b89c 100644 --- a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java +++ b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java @@ -64,11 +64,10 @@ public class KafkaSpecialProducerConsumerConfigExample { */ @Bean public ConsumerFactory kafkaConsumerFactory(KafkaProperties properties) { - Map consumererProperties = properties - .buildConsumerProperties(); - consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + Map consumerProperties = properties.buildConsumerProperties(); + consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyConsumerMetricsReporter.class); - return new DefaultKafkaConsumerFactory(consumererProperties); + return new DefaultKafkaConsumerFactory(consumerProperties); } }