KAFKA-3081: KTable Aggregation

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #761 from guozhangwang/K3081
This commit is contained in:
Guozhang Wang 2016-01-13 17:15:57 -08:00
parent 70011747c8
commit 4f22705c7d
15 changed files with 777 additions and 173 deletions

View File

@ -20,8 +20,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Collection;
/**
* KTable is an abstraction of a change log stream.
*
@ -152,9 +150,11 @@ public interface KTable<K, V> {
*/
<K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K> keySerializer,
Serializer<K1> keySerializer,
Serializer<V1> valueSerializer,
Serializer<V2> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<K1> keyDeserializer,
Deserializer<V1> valueDeserializer,
Deserializer<V2> aggValueDeserializer,
String name);
@ -167,36 +167,10 @@ public interface KTable<K, V> {
*/
<K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToLongMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
Serializer<K1> keySerializer,
Deserializer<K1> keyDeserializer,
String name);
/**
* Sum extracted integer values of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
* @param name the name of the resulted table
*/
<K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToIntMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Sum extracted double decimal values of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
* @param name the name of the resulted table
*/
<K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToDoubleMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Count number of records of this table by the selected aggregation key
*
@ -204,22 +178,10 @@ public interface KTable<K, V> {
* @param name the name of the resulted table
*/
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
Serializer<K1> keySerializer,
Serializer<V> valueSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V> valueDeserializer,
String name);
/**
* Get the top-k values of this table by the selected aggregation key
*
* @param k parameter of the top-k computation
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param name the name of the resulted table
*/
<K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer,
String name);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
private final Deserializer<T> inner;
public ChangedDeserializer(Deserializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public Change<T> deserialize(String topic, byte[] data) {
byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
System.arraycopy(data, 0, bytes, 0, bytes.length);
if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
return new Change<>(inner.deserialize(topic, bytes), null);
} else {
return new Change<>(null, inner.deserialize(topic, bytes));
}
}
@Override
public void close() {
inner.close();
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class ChangedSerializer<T> implements Serializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
private final Serializer<T> inner;
public ChangedSerializer(Serializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, Change<T> data) {
// only one of the old / new values would be not null
byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
buf.put((byte) (data.newValue != null ? 1 : 0));
return buf.array();
}
@Override
public void close() {
inner.close();
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
private class Count implements Aggregator<K, V, Long> {
@Override
public Long initialValue() {
return 0L;
}
@Override
public Long add(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
@Override
public Long remove(K aggKey, V value, Long aggregate) {
return aggregate - 1;
}
@Override
public Long merge(Long aggr1, Long aggr2) {
return aggr1 + aggr2;
}
}
@Override
public Aggregator<K, V, Long> get() {
return new Count();
}
}

View File

@ -250,16 +250,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier,
Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) {
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> thisValueDeserializer,
Deserializer<V1> otherValueDeserializer) {
return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
keySerializer, thisValueSerializer, otherValueSerializer,
keyDeserializer, thisValueDeserializer, otherValueDeserializer, false);
}
@Override
@ -267,16 +267,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier,
Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) {
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> thisValueDeserializer,
Deserializer<V1> otherValueDeserializer) {
return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
keySerializer, thisValueSerializer, otherValueSerializer,
keyDeserializer, thisValueDeserializer, otherValueDeserializer, true);
}
@SuppressWarnings("unchecked")
@ -284,12 +284,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier,
Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier,
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> thisValueDeserializer,
Deserializer<V1> otherValueDeserializer,
boolean outer) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@ -301,7 +301,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
null);
RocksDBWindowStoreSupplier<K, V1> otherWindow =
@ -311,7 +311,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
@ -344,10 +344,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier,
Deserializer<V1> otherValueDeserialzier) {
Serializer<K> keySerializer,
Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> otherValueDeserializer) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@ -358,7 +358,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.after,
windows.maintainMs(),
windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
private final String storeName;
private final Aggregator<K, V, T> aggregator;
private boolean sendOldValues = false;
KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
this.storeName = storeName;
this.aggregator = aggregator;
}
@Override
public void enableSendingOldValues() {
sendOldValues = true;
}
@Override
public Processor<K, Change<V>> get() {
return new KTableAggregateProcessor();
}
private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
private KeyValueStore<K, T> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}
@Override
public void process(K key, Change<V> value) {
T oldAgg = store.get(key);
if (oldAgg == null)
oldAgg = aggregator.initialValue();
T newAgg = oldAgg;
// first try to remove the old value
if (value.oldValue != null) {
newAgg = aggregator.remove(key, value.oldValue, newAgg);
}
// then try to add the new new value
if (value.newValue != null) {
newAgg = aggregator.add(key, value.newValue, newAgg);
}
// update the store with the new value
store.put(key, newAgg);
// send the old / new pair
if (sendOldValues)
context().forward(key, new Change<>(newAgg, oldAgg));
else
context().forward(key, new Change<>(newAgg, null));
}
}
@Override
public KTableValueGetterSupplier<K, T> view() {
return new KTableValueGetterSupplier<K, T>() {
public KTableValueGetter<K, T> get() {
return new KTableAggregateValueGetter();
}
};
}
private class KTableAggregateValueGetter implements KTableValueGetter<K, T> {
private KeyValueStore<K, T> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}
@Override
public T get(K key) {
return store.get(key);
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStream;
@ -25,16 +27,15 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
/**
@ -51,6 +52,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
private static final String SELECT_NAME = "KTABLE-SELECT-";
private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
public static final String SOURCE_NAME = "KTABLE-SOURCE-";
public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
@ -168,47 +173,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KStreamImpl<>(topology, name, sourceNodes);
}
@SuppressWarnings("unchecked")
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
return new KTableSourceValueGetterSupplier<>(source.topic);
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
}
@SuppressWarnings("unchecked")
void enableSendingOldValues() {
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
source.enableSendingOldValues();
} else {
((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
}
sendOldValues = true;
}
}
boolean sendingOldValueEnabled() {
return sendOldValues;
}
private void materialize(KTableSource<K, ?> source) {
synchronized (source) {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
// mark this state as non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
@ -281,63 +245,142 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K> keySerializer,
Serializer<K1> keySerializer,
Serializer<V1> valueSerializer,
Serializer<V2> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<K1> keyDeserializer,
Deserializer<V1> valueDeserializer,
Deserializer<V2> aggValueDeserializer,
String name) {
// TODO
return null;
String selectName = topology.newName(SELECT_NAME);
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
String aggregateName = topology.newName(AGGREGATE_NAME);
String topic = name + "-repartition";
ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
.localDatabase()
.build();
// select the aggregate key and values (old and new), it would require parent to send old values
topology.addProcessor(selectName, selectSupplier, this.name);
this.enableSendingOldValues();
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
// read the intermediate topic
topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
topology.addStateStore(aggregateStore, aggregateName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
}
@Override
public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToLongMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
final KeyValueToLongMapper<K, V> valueSelector,
Serializer<K1> keySerializer,
Deserializer<K1> keyDeserializer,
String name) {
// TODO
return null;
Serializer<Long> longSerializer = new LongSerializer();
Deserializer<Long> longDeserializer = new LongDeserializer();
KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
@Override
public KeyValue<K1, Long> apply(K key, V value) {
K1 aggKey = keySelector.apply(key, value);
Long aggValue = valueSelector.apply(key, value);
return new KeyValue<>(aggKey, aggValue);
}
};
return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
keySerializer, longSerializer, longSerializer,
keyDeserializer, longDeserializer, longDeserializer,
name);
}
@Override
public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToIntMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToDoubleMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
Serializer<K1> keySerializer,
Serializer<V> valueSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V> valueDeserializer,
String name) {
// TODO
return null;
Serializer<Long> longSerializer = new LongSerializer();
Deserializer<Long> longDeserializer = new LongDeserializer();
KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
@Override
public KeyValue<K1, V> apply(K key, V value) {
K1 aggKey = keySelector.apply(key, value);
return new KeyValue<>(aggKey, value);
}
};
return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
keySerializer, valueSerializer, longSerializer,
keyDeserializer, valueDeserializer, longDeserializer,
name);
}
@Override
public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
@SuppressWarnings("unchecked")
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
return new KTableSourceValueGetterSupplier<>(source.topic);
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
}
@Override
public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer,
String name) {
// TODO
return null;
@SuppressWarnings("unchecked")
void enableSendingOldValues() {
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
source.enableSendingOldValues();
} else {
((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
}
sendOldValues = true;
}
}
boolean sendingOldValueEnabled() {
return sendOldValues;
}
private void materialize(KTableSource<K, ?> source) {
synchronized (source) {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
// mark this state as non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
private final KTableImpl<K1, ?, V1> parent;
@ -36,7 +37,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
@Override
public Processor<K1, Change<V1>> get() {
return new KTableMapProcessor();
return new KTableMapValuesProcessor();
}
@Override
@ -67,16 +68,15 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
return newValue;
}
private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
@Override
public void process(K1 key, Change<V1> change) {
V2 newValue = computeValue(change.newValue);
V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
context().forward(key, new Change(newValue, oldValue));
context().forward(key, new Change<>(newValue, oldValue));
}
}
private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations.
*
* Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
*/
public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
private final KTableImpl<K1, ?, V1> parent;
private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
this.parent = parent;
this.mapper = mapper;
}
@Override
public Processor<K1, Change<V1>> get() {
return new KTableMapProcessor();
}
@Override
public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
return new KTableMapValueGetter(parentValueGetterSupplier.get());
}
};
}
@Override
public void enableSendingOldValues() {
// this should never be called
throw new KafkaException("KTableRepartitionMap should always require sending old values.");
}
private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
KeyValue<K2, V2> newValue = null;
if (key != null || value != null)
newValue = mapper.apply(key, value);
return newValue;
}
private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
@Override
public void process(K1 key, Change<V1> change) {
KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
context().forward(newPair.key, new Change<>(newPair.value, null));
if (change.oldValue != null) {
KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
context().forward(oldPair.key, new Change<>(null, oldPair.value));
}
}
}
private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
private final KTableValueGetter<K1, V1> parentGetter;
public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
this.parentGetter = parentGetter;
}
@Override
public void init(ProcessorContext context) {
parentGetter.init(context);
}
@Override
public KeyValue<K2, V2> get(K1 key) {
return computeValue(key, parentGetter.get(key));
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
private class LongSum implements Aggregator<K, Long, Long> {
@Override
public Long initialValue() {
return 0L;
}
@Override
public Long add(K aggKey, Long value, Long aggregate) {
return aggregate + value;
}
@Override
public Long remove(K aggKey, Long value, Long aggregate) {
return aggregate - value;
}
@Override
public Long merge(Long aggr1, Long aggr2) {
return aggr1 + aggr2;
}
}
@Override
public Aggregator<K, Long, Long> get() {
return new LongSum();
}
}

View File

@ -23,13 +23,13 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.nio.ByteBuffer;
import java.util.Map;
public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
public class WindowedDeserializer<T> implements Deserializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Deserializer<T> inner;
public DefaultWindowedDeserializer(Deserializer<T> inner) {
public WindowedDeserializer(Deserializer<T> inner) {
this.inner = inner;
}

View File

@ -23,13 +23,13 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.nio.ByteBuffer;
import java.util.Map;
public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Serializer<T> inner;
public DefaultWindowedSerializer(Serializer<T> inner) {
public WindowedSerializer(Serializer<T> inner) {
this.inner = inner;
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.NoOpKeyValueMapper;
import org.junit.Test;
import java.io.File;
import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();
private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
private class StringCanonizer implements Aggregator<String, String, String> {
@Override
public String initialValue() {
return "";
}
@Override
public String add(String aggKey, String value, String aggregate) {
return aggregate + "+" + value;
}
@Override
public String remove(String aggKey, String value, String aggregate) {
return aggregate + "-" + value;
}
@Override
public String merge(String aggr1, String aggr2) {
return "(" + aggr1 + ") + (" + aggr2 + ")";
}
}
@Override
public Aggregator<String, String, String> get() {
return new StringCanonizer();
}
}
@Test
public void testAggBasic() throws Exception {
final File baseDir = Files.createTempDirectory("test").toFile();
try {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(),
new NoOpKeyValueMapper<String, String>(),
strSerializer,
strSerializer,
strSerializer,
strDeserializer,
strDeserializer,
strDeserializer,
"topic1-Canonized");
MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
driver.process(topic1, "A", "1");
driver.process(topic1, "B", "2");
driver.process(topic1, "A", "3");
driver.process(topic1, "B", "4");
driver.process(topic1, "C", "5");
driver.process(topic1, "D", "6");
driver.process(topic1, "B", "7");
driver.process(topic1, "C", "8");
assertEquals(Utils.mkList(
"A:+1",
"B:+2",
"A:+1+3", "A:+1+3-1",
"B:+2+4", "B:+2+4-2",
"C:+5",
"D:+6",
"B:+2+4-2+7", "B:+2+4-2+7-4",
"C:+5+8", "C:+5+8-5"), proc2.processed);
} finally {
Utils.delete(baseDir);
}
}
}

View File

@ -39,7 +39,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class KTableMapValuesImplTest {
public class KTableMapValuesTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
@Override
public KeyValue<K, V> apply(K key, V value) {
return new KeyValue<>(key, value);
}
}