Minor resolve streams scala warnings (#6369)

Resolves the compiler warnings when building streams-scala.

Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
John Roesler 2019-03-07 17:05:35 -06:00 committed by Guozhang Wang
parent 96c4e323bf
commit ccd3af1566
2 changed files with 44 additions and 38 deletions

View File

@ -75,6 +75,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
streams.close()
}
@ -115,6 +118,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
streams.close()
}
@ -163,7 +169,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
.groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
def apply(v1: JLong, v2: JLong): JLong = v1 + v2
}
}

View File

@ -48,16 +48,16 @@ import _root_.scala.collection.JavaConverters._
*/
class TopologyTest extends JUnitSuite {
val inputTopic = "input-topic"
val userClicksTopic = "user-clicks-topic"
val userRegionsTopic = "user-regions-topic"
private val inputTopic = "input-topic"
private val userClicksTopic = "user-clicks-topic"
private val userRegionsTopic = "user-regions-topic"
val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
private val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
@Test def shouldBuildIdenticalTopologyInJavaNScalaSimple() = {
@Test def shouldBuildIdenticalTopologyInJavaNScalaSimple(): Unit = {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
def getTopologyScala: TopologyDescription = {
import Serdes._
@ -71,27 +71,27 @@ class TopologyTest extends JUnitSuite {
}
// build the Java topology
def getTopologyJava(): TopologyDescription = {
def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines = streamBuilder.stream[String, String](inputTopic)
val _: KStreamJ[String, String] = textLines.flatMapValues {
val _: KStreamJ[String, String] = textLines.flatMapValues(
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
}
}
)
streamBuilder.build().describe()
}
// should match
assertEquals(getTopologyScala(), getTopologyJava())
assertEquals(getTopologyScala, getTopologyJava)
}
@Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate() = {
@Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate(): Unit = {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
def getTopologyScala: TopologyDescription = {
import Serdes._
@ -101,23 +101,23 @@ class TopologyTest extends JUnitSuite {
val _: KTable[String, Long] =
textLines
.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.groupBy((_, v) => v)
.count()
streamBuilder.build().describe()
}
// build the Java topology
def getTopologyJava(): TopologyDescription = {
def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
val splits: KStreamJ[String, String] = textLines.flatMapValues {
val splits: KStreamJ[String, String] = textLines.flatMapValues(
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
}
}
)
val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
new KeyValueMapper[String, String, String] {
@ -125,19 +125,19 @@ class TopologyTest extends JUnitSuite {
}
}
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
grouped.count()
streamBuilder.build().describe()
}
// should match
assertEquals(getTopologyScala(), getTopologyJava())
assertEquals(getTopologyScala, getTopologyJava)
}
@Test def shouldBuildIdenticalTopologyInJavaNScalaJoin() = {
@Test def shouldBuildIdenticalTopologyInJavaNScalaJoin(): Unit = {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
def getTopologyScala: TopologyDescription = {
import Serdes._
val builder = new StreamsBuilder()
@ -146,18 +146,18 @@ class TopologyTest extends JUnitSuite {
val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
val clicksPerRegion: KTable[String, Long] =
userClicksStream
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
.map((_, regionWithClicks) => regionWithClicks)
.groupByKey
.reduce(_ + _)
// clicks per region
userClicksStream
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
.map((_, regionWithClicks) => regionWithClicks)
.groupByKey
.reduce(_ + _)
builder.build().describe()
}
// build the Java topology
def getTopologyJava(): TopologyDescription = {
def getTopologyJava: TopologyDescription = {
import java.lang.{Long => JLong}
@ -190,11 +190,11 @@ class TopologyTest extends JUnitSuite {
}
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
clicksByRegion
.groupByKey(Grouped.`with`[String, JLong])
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
def apply(v1: JLong, v2: JLong): JLong = v1 + v2
}
}
@ -202,20 +202,19 @@ class TopologyTest extends JUnitSuite {
}
// should match
assertEquals(getTopologyScala(), getTopologyJava())
assertEquals(getTopologyScala, getTopologyJava)
}
@Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
@Test def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
def getTopologyScala: TopologyDescription = {
import Serdes._
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
//noinspection ConvertExpressionToSAM due to 2.11 build
val _: KTable[String, Long] =
textLines
.transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
@ -229,14 +228,14 @@ class TopologyTest extends JUnitSuite {
override def close(): Unit = Unit
}
})
.groupBy((k, v) => v)
.groupBy((_, v) => v)
.count()
streamBuilder.build().describe()
}
// build the Java topology
def getTopologyJava(): TopologyDescription = {
def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
@ -260,12 +259,13 @@ class TopologyTest extends JUnitSuite {
}
}
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
// word counts
grouped.count()
streamBuilder.build().describe()
}
// should match
assertEquals(getTopologyScala(), getTopologyJava())
assertEquals(getTopologyScala, getTopologyJava)
}
}