MINOR: Pass materialized to the inner KTable instance (#11888)

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Márton Sigmond 2022-03-21 05:03:04 -04:00 committed by GitHub
parent 3a8f6b17a6
commit e5eb180a6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 1 deletions

View File

@ -249,7 +249,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues` * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/ */
def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
new KTable(inner.mapValues[VR](mapper.asValueMapperWithKey)) new KTable(inner.mapValues[VR](mapper.asValueMapperWithKey, materialized))
/** /**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value

View File

@ -496,4 +496,42 @@ class KTableTest extends TestDriver {
assertTrue(joinNodeLeft.name().contains("my-name")) assertTrue(joinNodeLeft.name().contains("my-name"))
assertTrue(joinNodeRight.name().contains("my-name")) assertTrue(joinNodeRight.name().contains("my-name"))
} }
@Test
def testMapValuesWithValueMapperWithMaterialized(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val stateStore = "store"
val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore)
val table = builder.stream[String, String](sourceTopic).toTable
table.mapValues(value => value.length.toLong, materialized)
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[String, String](sourceTopic)
testInput.pipeInput("1", "topic1value1")
assertEquals(12, testDriver.getKeyValueStore[String, Long](stateStore).get("1"))
testDriver.close()
}
@Test
def testMapValuesWithValueMapperWithKeyAndWithMaterialized(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val stateStore = "store"
val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore)
val table = builder.stream[String, String](sourceTopic).toTable
table.mapValues((key, value) => key.length + value.length.toLong, materialized)
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[String, String](sourceTopic)
testInput.pipeInput("1", "topic1value1")
assertEquals(13, testDriver.getKeyValueStore[String, Long](stateStore).get("1"))
testDriver.close()
}
} }