KAFKA-5844; add groupBy(selector, serialized) to Ktable

add `KTable#groupBy(KeyValueMapper, Serialized)` and deprecate the overload with `Serde` params

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>

Closes #3802 from dguy/kip-182-ktable-groupby
This commit is contained in:
Damian Guy 2017-09-07 12:35:31 +01:00
parent 9cbb9f0939
commit 329d5fa64a
6 changed files with 62 additions and 24 deletions

View File

@ -1001,7 +1001,7 @@ public interface KTable<K, V> {
* records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned
* on the new key.
* <p>
* If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serde, Serde)}
* If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)}
* instead.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
@ -1011,6 +1011,35 @@ public interface KTable<K, V> {
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Serialized}.
* Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
* provided {@link KeyValueMapper}.
* Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedTable}).
* The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type).
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
* an internally generated name, and "-repartition" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
* records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned
* on the new key.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
* @param serialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
* @param <KR> the key type of the result {@link KGroupedTable}
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
final Serialized<KR, VR> serialized);
/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}.
* Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
@ -1038,7 +1067,9 @@ public interface KTable<K, V> {
* @param <KR> the key type of the result {@link KGroupedTable}
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
* @deprecated use {@link #groupBy(KeyValueMapper, Serialized)}
*/
@Deprecated
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
final Serde<KR> keySerde,
final Serde<VR> valueSerde);

View File

@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue;
* @see KStream#flatMap(KeyValueMapper)
* @see KStream#selectKey(KeyValueMapper)
* @see KStream#groupBy(KeyValueMapper)
* @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
* @see KStream#groupBy(KeyValueMapper, Serialized)
* @see KTable#groupBy(KeyValueMapper)
* @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
* @see KTable#groupBy(KeyValueMapper, Serialized)
* @see KTable#toStream(KeyValueMapper)
*/
public interface KeyValueMapper<K, V, VR> {

View File

@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@ -609,21 +610,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
final Serde<K1> keySerde,
final Serde<V1> valueSerde) {
return groupBy(selector, Serialized.with(keySerde, valueSerde));
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
return this.groupBy(selector, Serialized.<K1, V1>with(null, null));
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
final Serialized<K1, V1> serialized) {
Objects.requireNonNull(selector, "selector can't be null");
Objects.requireNonNull(serialized, "serialized can't be null");
String selectName = builder.newName(SELECT_NAME);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
// select the aggregate key and values (old and new), it would require parent to send old values
builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
this.enableSendingOldValues();
return new KGroupedTableImpl<>(builder, selectName, this.name, keySerde, valueSerde);
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
return this.groupBy(selector, null, null);
return new KGroupedTableImpl<>(builder, selectName, this.name, serialized.keySerde(), serialized.valueSerde());
}
@SuppressWarnings("unchecked")

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
@ -49,6 +50,7 @@ import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
final private Serde<String> stringSerde = Serdes.String();
private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
private File stateDir = null;
@ -70,8 +72,7 @@ public class KTableAggregateTest {
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
stringSerde,
stringSerde
stringSerialzied
).aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
@ -119,8 +120,7 @@ public class KTableAggregateTest {
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
stringSerde,
stringSerde
stringSerialzied
).aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER,
@ -160,8 +160,7 @@ public class KTableAggregateTest {
}
}
},
stringSerde,
stringSerde
stringSerialzied
)
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@ -234,7 +233,7 @@ public class KTableAggregateTest {
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
.process(proc);
@ -249,7 +248,7 @@ public class KTableAggregateTest {
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count()
.toStream()
.process(proc);
@ -264,7 +263,7 @@ public class KTableAggregateTest {
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
.process(proc);
@ -299,7 +298,7 @@ public class KTableAggregateTest {
public KeyValue<String, String> apply(String key, String value) {
return KeyValue.pair(String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1)));
}
}, stringSerde, stringSerde)
}, stringSerialzied)
.aggregate(new Initializer<String>() {
@Override
@ -358,7 +357,7 @@ public class KTableAggregateTest {
public KeyValue<String, Long> apply(final Long key, final String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
}, Serialized.with(Serdes.String(), Serdes.Long()))
.reduce(new Reducer<Long>() {
@Override
public Long apply(final Long value1, final Long value2) {

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@ -346,7 +347,7 @@ public class KTableKTableLeftJoinTest {
public KeyValue<Long, String> apply(final Long key, final String value) {
return new KeyValue<>(key, value);
}
}, Serdes.Long(), Serdes.String()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store");
}, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store");
final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne);
final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);

View File

@ -219,8 +219,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// test repartition
Agg agg = new Agg();
cntTable.groupBy(agg.selector(),
stringSerde,
longSerde
Serialized.with(stringSerde, longSerde)
).aggregate(agg.init(),
agg.adder(),
agg.remover(),