diff --git a/spring-boot-samples/README.adoc b/spring-boot-samples/README.adoc index 0e48ede4f86..c2d27ec1ad4 100644 --- a/spring-boot-samples/README.adoc +++ b/spring-boot-samples/README.adoc @@ -119,6 +119,9 @@ The following sample applications are provided: | link:spring-boot-sample-junit-jupiter[spring-boot-sample-junit-jupiter] | Demonstrates JUnit Jupiter-based testing +| link:spring-boot-sample-kafka[spring-boot-sample-kafka] +| consumer and producer using Apache Kafka + | link:spring-boot-sample-liquibase[spring-boot-sample-liquibase] | Database migrations with Liquibase @@ -230,6 +233,3 @@ The following sample applications are provided: | link:spring-boot-sample-xml[spring-boot-sample-xml] | Example show how Spring Boot can be mixed with traditional XML configuration (we generally recommend using Java `@Configuration` whenever possible - -| link:spring-boot-sample-kafka[spring-boot-sample-kafka] -| consumer and producer using Apache Kafka diff --git a/spring-boot-samples/pom.xml b/spring-boot-samples/pom.xml index 83792c9881b..e3a90ee1cc9 100644 --- a/spring-boot-samples/pom.xml +++ b/spring-boot-samples/pom.xml @@ -57,6 +57,7 @@ spring-boot-sample-jta-narayana spring-boot-sample-jta-jndi spring-boot-sample-junit-jupiter + spring-boot-sample-kafka spring-boot-sample-liquibase spring-boot-sample-logback spring-boot-sample-oauth2-client @@ -97,7 +98,6 @@ spring-boot-sample-websocket-undertow spring-boot-sample-webservices spring-boot-sample-xml - spring-boot-sample-kafka diff --git a/spring-boot-samples/spring-boot-sample-kafka/pom.xml b/spring-boot-samples/spring-boot-sample-kafka/pom.xml index 901631faa6e..e0d3ca27171 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/pom.xml +++ b/spring-boot-samples/spring-boot-sample-kafka/pom.xml @@ -20,16 +20,12 @@ org.springframework.boot - spring-boot-starter + spring-boot-starter-json org.springframework.kafka spring-kafka - - org.springframework.boot - spring-boot-starter-json - org.springframework.boot @@ -39,6 +35,7 @@ org.springframework.kafka spring-kafka-test + test diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java index 4e7e2a8b3d3..a18227b40a4 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java @@ -19,10 +19,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component -public class Consumer { +class Consumer { - @KafkaListener(topics = "myTopic") + @KafkaListener(topics = "testTopic") public void processMessage(SampleMessage message) { - System.out.println("consumer has received message : [" + message + "]"); + System.out.println("Received sample message [" + message + "]"); } + } \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java index 89c50e3533a..3297bb4780c 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java @@ -15,18 +15,21 @@ */ package sample.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class Producer { - @Autowired - private KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; + + Producer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } public void send(SampleMessage message) { - kafkaTemplate.send("myTopic", message); - System.out.println("producer has sent message."); + this.kafkaTemplate.send("testTopic", message); + System.out.println("Sent sample message [" + message + "]"); } + } \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java index 9769c06acea..00e677b1f6b 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java @@ -15,8 +15,11 @@ */ package sample.kafka; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; @SpringBootApplication public class SampleKafkaApplication { @@ -24,4 +27,10 @@ public class SampleKafkaApplication { public static void main(String[] args) { SpringApplication.run(SampleKafkaApplication.class, args); } + + @Bean + public ApplicationRunner runner(Producer producer) { + return args -> producer.send(new SampleMessage(1, "A simple test message")); + } + } diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java index c0845fb491f..d11cd9a4f91 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java @@ -15,39 +15,37 @@ */ package sample.kafka; -public class SampleMessage { - private Integer id; - private String message; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; - public Integer getId() { - return id; +public class SampleMessage { + + private final Integer id; + + private final String message; + + @JsonCreator + public SampleMessage(@JsonProperty("id") Integer id, + @JsonProperty("message") String message) { + this.id = id; + this.message = message; } - public void setId(Integer id) { - this.id = id; + public Integer getId() { + return this.id; } public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public SampleMessage() { - } - - public SampleMessage(Integer id, String message) { - this.id = id; - this.message = message; + return this.message; } @Override public String toString() { - return "SampleMessage{" + - "id=" + id + - ", message='" + message + '\'' + - '}'; + final StringBuilder sb = new StringBuilder("SampleMessage{"); + sb.append("id=").append(this.id); + sb.append(", message='").append(this.message).append('\''); + sb.append('}'); + return sb.toString(); } + } diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties index 588dc54c56f..97dbed42b9b 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties @@ -1,5 +1,6 @@ spring.kafka.bootstrap-servers=localhost:9092 -spring.kafka.consumer.group-id=myGroup +spring.kafka.consumer.group-id=testGroup +spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer -spring.kafka.Producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java index 3f910199466..c6364d6d7d5 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java @@ -15,13 +15,20 @@ */ package sample.kafka; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.rule.OutputCapture; -import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -30,32 +37,41 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests for demo application. * * @author hcxin + * @author Gary Russell + * @author Stephane Nicoll */ @RunWith(SpringRunner.class) @SpringBootTest +@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@EmbeddedKafka public class SampleKafkaApplicationTests { + private static final CountDownLatch latch = new CountDownLatch(1); + @Rule public OutputCapture outputCapture = new OutputCapture(); - @Autowired - private Producer producer; - @Test - public void sendSimpleMessage() throws Exception { - initKafkaEmbedded(); - SampleMessage message = new SampleMessage(1, "Test message"); - producer.send(message); - Thread.sleep(1000L); - assertThat(this.outputCapture.toString().contains("Test message")).isTrue(); + public void testVanillaExchange() throws Exception { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.outputCapture.toString().contains("A simple test message")) + .isTrue(); } - public void initKafkaEmbedded() throws Exception { - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true); - embeddedKafka.setKafkaPorts(9092); - embeddedKafka.afterPropertiesSet(); - //Need 10s, waiting for the Kafka server start. - Thread.sleep(10000L); + @TestConfiguration + public static class Config { + + @Bean + public Consumer consumer() { + return new Consumer() { + @Override + public void processMessage(SampleMessage message) { + super.processMessage(message); + latch.countDown(); + } + }; + } } + }