diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 523418d4ecc..f820c3eccbb 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -75,6 +75,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) + + Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty) + streams.close() } @@ -115,6 +118,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) + + Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty) + streams.close() } @@ -163,7 +169,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ .groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong)) .reduce { new Reducer[JLong] { - def apply(v1: JLong, v2: JLong) = v1 + v2 + def apply(v1: JLong, v2: JLong): JLong = v1 + v2 } } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index a826401b455..6035dd0db37 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -48,16 +48,16 @@ import _root_.scala.collection.JavaConverters._ */ class TopologyTest extends JUnitSuite { - val inputTopic = "input-topic" - val userClicksTopic = "user-clicks-topic" - val userRegionsTopic = "user-regions-topic" + private val inputTopic = "input-topic" + private val userClicksTopic = "user-clicks-topic" + private val userRegionsTopic = "user-regions-topic" - val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS) + private val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS) - @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple() = { + @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple(): Unit = { // build the Scala topology - def getTopologyScala(): TopologyDescription = { + def getTopologyScala: TopologyDescription = { import Serdes._ @@ -71,27 +71,27 @@ class TopologyTest extends JUnitSuite { } // build the Java topology - def getTopologyJava(): TopologyDescription = { + def getTopologyJava: TopologyDescription = { val streamBuilder = new StreamsBuilderJ val textLines = streamBuilder.stream[String, String](inputTopic) - val _: KStreamJ[String, String] = textLines.flatMapValues { + val _: KStreamJ[String, String] = textLines.flatMapValues( new ValueMapper[String, java.lang.Iterable[String]] { def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava } - } + ) streamBuilder.build().describe() } // should match - assertEquals(getTopologyScala(), getTopologyJava()) + assertEquals(getTopologyScala, getTopologyJava) } - @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate() = { + @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate(): Unit = { // build the Scala topology - def getTopologyScala(): TopologyDescription = { + def getTopologyScala: TopologyDescription = { import Serdes._ @@ -101,23 +101,23 @@ class TopologyTest extends JUnitSuite { val _: KTable[String, Long] = textLines .flatMapValues(v => pattern.split(v.toLowerCase)) - .groupBy((k, v) => v) + .groupBy((_, v) => v) .count() streamBuilder.build().describe() } // build the Java topology - def getTopologyJava(): TopologyDescription = { + def getTopologyJava: TopologyDescription = { val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic) - val splits: KStreamJ[String, String] = textLines.flatMapValues { + val splits: KStreamJ[String, String] = textLines.flatMapValues( new ValueMapper[String, java.lang.Iterable[String]] { def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava } - } + ) val grouped: KGroupedStreamJ[String, String] = splits.groupBy { new KeyValueMapper[String, String, String] { @@ -125,19 +125,19 @@ class TopologyTest extends JUnitSuite { } } - val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() + grouped.count() streamBuilder.build().describe() } // should match - assertEquals(getTopologyScala(), getTopologyJava()) + assertEquals(getTopologyScala, getTopologyJava) } - @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin() = { + @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin(): Unit = { // build the Scala topology - def getTopologyScala(): TopologyDescription = { + def getTopologyScala: TopologyDescription = { import Serdes._ val builder = new StreamsBuilder() @@ -146,18 +146,18 @@ class TopologyTest extends JUnitSuite { val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) - val clicksPerRegion: KTable[String, Long] = - userClicksStream - .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) - .map((_, regionWithClicks) => regionWithClicks) - .groupByKey - .reduce(_ + _) + // clicks per region + userClicksStream + .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) + .map((_, regionWithClicks) => regionWithClicks) + .groupByKey + .reduce(_ + _) builder.build().describe() } // build the Java topology - def getTopologyJava(): TopologyDescription = { + def getTopologyJava: TopologyDescription = { import java.lang.{Long => JLong} @@ -190,11 +190,11 @@ class TopologyTest extends JUnitSuite { } // Compute the total per region by summing the individual click counts per region. - val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion + clicksByRegion .groupByKey(Grouped.`with`[String, JLong]) .reduce { new Reducer[JLong] { - def apply(v1: JLong, v2: JLong) = v1 + v2 + def apply(v1: JLong, v2: JLong): JLong = v1 + v2 } } @@ -202,20 +202,19 @@ class TopologyTest extends JUnitSuite { } // should match - assertEquals(getTopologyScala(), getTopologyJava()) + assertEquals(getTopologyScala, getTopologyJava) } - @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = { + @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = { // build the Scala topology - def getTopologyScala(): TopologyDescription = { + def getTopologyScala: TopologyDescription = { import Serdes._ val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) - //noinspection ConvertExpressionToSAM due to 2.11 build val _: KTable[String, Long] = textLines .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { @@ -229,14 +228,14 @@ class TopologyTest extends JUnitSuite { override def close(): Unit = Unit } }) - .groupBy((k, v) => v) + .groupBy((_, v) => v) .count() streamBuilder.build().describe() } // build the Java topology - def getTopologyJava(): TopologyDescription = { + def getTopologyJava: TopologyDescription = { val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic) @@ -260,12 +259,13 @@ class TopologyTest extends JUnitSuite { } } - val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() + // word counts + grouped.count() streamBuilder.build().describe() } // should match - assertEquals(getTopologyScala(), getTopologyJava()) + assertEquals(getTopologyScala, getTopologyJava) } }