diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 997edcd53d8..9837dae63bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -20,8 +20,6 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import java.util.Collection; - /** * KTable is an abstraction of a change log stream. * @@ -152,9 +150,11 @@ public interface KTable { */ KTable aggregate(AggregatorSupplier aggregatorSupplier, KeyValueMapper> selector, - Serializer keySerializer, + Serializer keySerializer, + Serializer valueSerializer, Serializer aggValueSerializer, - Deserializer keyDeserializer, + Deserializer keyDeserializer, + Deserializer valueDeserializer, Deserializer aggValueDeserializer, String name); @@ -167,36 +167,10 @@ public interface KTable { */ KTable sum(KeyValueMapper keySelector, KeyValueToLongMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, + Serializer keySerializer, + Deserializer keyDeserializer, String name); - /** - * Sum extracted integer values of this table by the selected aggregation key - * - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value - * @param name the name of the resulted table - */ - KTable sum(KeyValueMapper keySelector, - KeyValueToIntMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, - String name); - - /** - * Sum extracted double decimal values of this table by the selected aggregation key - * - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value - * @param name the name of the resulted table - */ - KTable sum(KeyValueMapper keySelector, - KeyValueToDoubleMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, - String name); - /** * Count number of records of this table by the selected aggregation key * @@ -204,22 +178,10 @@ public interface KTable { * @param name the name of the resulted table */ KTable count(KeyValueMapper keySelector, - Serializer keySerializer, - Deserializer keyDeserializer, + Serializer keySerializer, + Serializer valueSerializer, + Deserializer keyDeserializer, + Deserializer valueDeserializer, String name); - /** - * Get the top-k values of this table by the selected aggregation key - * - * @param k parameter of the top-k computation - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param name the name of the resulted table - */ - > KTable> topK(int k, - KeyValueMapper keySelector, - Serializer keySerializer, - Serializer aggValueSerializer, - Deserializer keyDeserializer, - Deserializer aggValueDeserializer, - String name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java new file mode 100644 index 00000000000..d4c4e2deb50 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ChangedDeserializer implements Deserializer> { + + private static final int NEWFLAG_SIZE = 1; + + private final Deserializer inner; + + public ChangedDeserializer(Deserializer inner) { + this.inner = inner; + } + + @Override + public void configure(Map configs, boolean isKey) { + // do nothing + } + + @Override + public Change deserialize(String topic, byte[] data) { + + byte[] bytes = new byte[data.length - NEWFLAG_SIZE]; + + System.arraycopy(data, 0, bytes, 0, bytes.length); + + if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) { + return new Change<>(inner.deserialize(topic, bytes), null); + } else { + return new Change<>(null, inner.deserialize(topic, bytes)); + } + } + + + @Override + public void close() { + inner.close(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java new file mode 100644 index 00000000000..e9b7cada80d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ChangedSerializer implements Serializer> { + + private static final int NEWFLAG_SIZE = 1; + + private final Serializer inner; + + public ChangedSerializer(Serializer inner) { + this.inner = inner; + } + + @Override + public void configure(Map configs, boolean isKey) { + // do nothing + } + + @Override + public byte[] serialize(String topic, Change data) { + // only one of the old / new values would be not null + byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue); + + ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE); + buf.put(serializedKey); + buf.put((byte) (data.newValue != null ? 1 : 0)); + + return buf.array(); + } + + + @Override + public void close() { + inner.close(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java new file mode 100644 index 00000000000..b7dc5aa3371 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; + +public class CountSupplier implements AggregatorSupplier { + + private class Count implements Aggregator { + + @Override + public Long initialValue() { + return 0L; + } + + @Override + public Long add(K aggKey, V value, Long aggregate) { + return aggregate + 1; + } + + @Override + public Long remove(K aggKey, V value, Long aggregate) { + return aggregate - 1; + } + + @Override + public Long merge(Long aggr1, Long aggr2) { + return aggr1 + aggr2; + } + } + + @Override + public Aggregator get() { + return new Count(); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 4505e74bbd4..f53c0d0c849 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -250,16 +250,16 @@ public class KStreamImpl extends AbstractStream implements KStream other, ValueJoiner joiner, JoinWindows windows, - Serializer keySerialzier, - Serializer thisValueSerialzier, - Serializer otherValueSerialzier, - Deserializer keyDeserialier, - Deserializer thisValueDeserialzier, - Deserializer otherValueDeserialzier) { + Serializer keySerializer, + Serializer thisValueSerializer, + Serializer otherValueSerializer, + Deserializer keyDeserializer, + Deserializer thisValueDeserializer, + Deserializer otherValueDeserializer) { return join(other, joiner, windows, - keySerialzier, thisValueSerialzier, otherValueSerialzier, - keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false); + keySerializer, thisValueSerializer, otherValueSerializer, + keyDeserializer, thisValueDeserializer, otherValueDeserializer, false); } @Override @@ -267,16 +267,16 @@ public class KStreamImpl extends AbstractStream implements KStream other, ValueJoiner joiner, JoinWindows windows, - Serializer keySerialzier, - Serializer thisValueSerialzier, - Serializer otherValueSerialzier, - Deserializer keyDeserialier, - Deserializer thisValueDeserialzier, - Deserializer otherValueDeserialzier) { + Serializer keySerializer, + Serializer thisValueSerializer, + Serializer otherValueSerializer, + Deserializer keyDeserializer, + Deserializer thisValueDeserializer, + Deserializer otherValueDeserializer) { return join(other, joiner, windows, - keySerialzier, thisValueSerialzier, otherValueSerialzier, - keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true); + keySerializer, thisValueSerializer, otherValueSerializer, + keyDeserializer, thisValueDeserializer, otherValueDeserializer, true); } @SuppressWarnings("unchecked") @@ -284,12 +284,12 @@ public class KStreamImpl extends AbstractStream implements KStream other, ValueJoiner joiner, JoinWindows windows, - Serializer keySerialzier, - Serializer thisValueSerialzier, - Serializer otherValueSerialzier, - Deserializer keyDeserialier, - Deserializer thisValueDeserialzier, - Deserializer otherValueDeserialzier, + Serializer keySerializer, + Serializer thisValueSerializer, + Serializer otherValueSerializer, + Deserializer keyDeserializer, + Deserializer thisValueDeserializer, + Deserializer otherValueDeserializer, boolean outer) { Set allSourceNodes = ensureJoinableWith((AbstractStream) other); @@ -301,7 +301,7 @@ public class KStreamImpl extends AbstractStream implements KStream("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer), null); RocksDBWindowStoreSupplier otherWindow = @@ -311,7 +311,7 @@ public class KStreamImpl extends AbstractStream implements KStream("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name()); @@ -344,10 +344,10 @@ public class KStreamImpl extends AbstractStream implements KStream other, ValueJoiner joiner, JoinWindows windows, - Serializer keySerialzier, - Serializer otherValueSerialzier, - Deserializer keyDeserialier, - Deserializer otherValueDeserialzier) { + Serializer keySerializer, + Serializer otherValueSerializer, + Deserializer keyDeserializer, + Deserializer otherValueDeserializer) { Set allSourceNodes = ensureJoinableWith((AbstractStream) other); @@ -358,7 +358,7 @@ public class KStreamImpl extends AbstractStream implements KStream("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java new file mode 100644 index 00000000000..a5948f8da27 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; + +public class KTableAggregate implements KTableProcessorSupplier { + + private final String storeName; + private final Aggregator aggregator; + + private boolean sendOldValues = false; + + KTableAggregate(String storeName, Aggregator aggregator) { + this.storeName = storeName; + this.aggregator = aggregator; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + @Override + public Processor> get() { + return new KTableAggregateProcessor(); + } + + private class KTableAggregateProcessor extends AbstractProcessor> { + + private KeyValueStore store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + store = (KeyValueStore) context.getStateStore(storeName); + } + + @Override + public void process(K key, Change value) { + T oldAgg = store.get(key); + + if (oldAgg == null) + oldAgg = aggregator.initialValue(); + + T newAgg = oldAgg; + + // first try to remove the old value + if (value.oldValue != null) { + newAgg = aggregator.remove(key, value.oldValue, newAgg); + } + + // then try to add the new new value + if (value.newValue != null) { + newAgg = aggregator.add(key, value.newValue, newAgg); + } + + // update the store with the new value + store.put(key, newAgg); + + // send the old / new pair + if (sendOldValues) + context().forward(key, new Change<>(newAgg, oldAgg)); + else + context().forward(key, new Change<>(newAgg, null)); + } + } + + @Override + public KTableValueGetterSupplier view() { + + return new KTableValueGetterSupplier() { + + public KTableValueGetter get() { + return new KTableAggregateValueGetter(); + } + + }; + } + + private class KTableAggregateValueGetter implements KTableValueGetter { + + private KeyValueStore store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + store = (KeyValueStore) context.getStateStore(storeName); + } + + @Override + public T get(K key) { + return store.get(key); + } + + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 32d3cc52a64..7f30f59e9e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.KStream; @@ -25,16 +27,15 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper; -import org.apache.kafka.streams.kstream.KeyValueToIntMapper; import org.apache.kafka.streams.kstream.KeyValueToLongMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Stores; -import java.util.Collection; +import java.util.Collections; import java.util.Set; /** @@ -51,6 +52,10 @@ public class KTableImpl extends AbstractStream implements KTable extends AbstractStream implements KTable(topology, name, sourceNodes); } - @SuppressWarnings("unchecked") - KTableValueGetterSupplier valueGetterSupplier() { - if (processorSupplier instanceof KTableSource) { - KTableSource source = (KTableSource) processorSupplier; - materialize(source); - return new KTableSourceValueGetterSupplier<>(source.topic); - } else { - return ((KTableProcessorSupplier) processorSupplier).view(); - } - } - - @SuppressWarnings("unchecked") - void enableSendingOldValues() { - if (!sendOldValues) { - if (processorSupplier instanceof KTableSource) { - KTableSource source = (KTableSource) processorSupplier; - materialize(source); - source.enableSendingOldValues(); - } else { - ((KTableProcessorSupplier) processorSupplier).enableSendingOldValues(); - } - sendOldValues = true; - } - } - - boolean sendingOldValueEnabled() { - return sendOldValues; - } - - private void materialize(KTableSource source) { - synchronized (source) { - if (!source.isMaterialized()) { - StateStoreSupplier storeSupplier = - new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); - // mark this state as non internal hence it is read directly from a user topic - topology.addStateStore(storeSupplier, false, name); - source.materialize(); - } - } - } - @SuppressWarnings("unchecked") @Override public KTable join(KTable other, ValueJoiner joiner) { @@ -281,63 +245,142 @@ public class KTableImpl extends AbstractStream implements KTable KTable aggregate(AggregatorSupplier aggregatorSupplier, KeyValueMapper> selector, - Serializer keySerializer, + Serializer keySerializer, + Serializer valueSerializer, Serializer aggValueSerializer, - Deserializer keyDeserializer, + Deserializer keyDeserializer, + Deserializer valueDeserializer, Deserializer aggValueDeserializer, String name) { - // TODO - return null; + + String selectName = topology.newName(SELECT_NAME); + String sinkName = topology.newName(KStreamImpl.SINK_NAME); + String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); + String aggregateName = topology.newName(AGGREGATE_NAME); + + String topic = name + "-repartition"; + + ChangedSerializer changedValueSerializer = new ChangedSerializer<>(valueSerializer); + ChangedDeserializer changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); + + KTableProcessorSupplier> selectSupplier = new KTableRepartitionMap<>(this, selector); + + ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get()); + + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerializer, keyDeserializer) + .withValues(aggValueSerializer, aggValueDeserializer) + .localDatabase() + .build(); + + // select the aggregate key and values (old and new), it would require parent to send old values + topology.addProcessor(selectName, selectSupplier, this.name); + this.enableSendingOldValues(); + + // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); + + // read the intermediate topic + topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); + + // aggregate the values with the aggregator and local store + topology.addProcessor(aggregateName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, aggregateName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); } @Override - public KTable sum(KeyValueMapper keySelector, - KeyValueToLongMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, + public KTable sum(final KeyValueMapper keySelector, + final KeyValueToLongMapper valueSelector, + Serializer keySerializer, + Deserializer keyDeserializer, String name) { - // TODO - return null; + + Serializer longSerializer = new LongSerializer(); + Deserializer longDeserializer = new LongDeserializer(); + + KeyValueMapper> mapper = new KeyValueMapper>() { + @Override + public KeyValue apply(K key, V value) { + K1 aggKey = keySelector.apply(key, value); + Long aggValue = valueSelector.apply(key, value); + + return new KeyValue<>(aggKey, aggValue); + } + }; + + return this.aggregate(new LongSumSupplier(), mapper, + keySerializer, longSerializer, longSerializer, + keyDeserializer, longDeserializer, longDeserializer, + name); } @Override - public KTable sum(KeyValueMapper keySelector, - KeyValueToIntMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, - String name) { - // TODO - return null; - } - - @Override - public KTable sum(KeyValueMapper keySelector, - KeyValueToDoubleMapper valueSelector, - Serializer keySerializer, - Deserializer keyDeserializer, + public KTable count(final KeyValueMapper keySelector, + Serializer keySerializer, + Serializer valueSerializer, + Deserializer keyDeserializer, + Deserializer valueDeserializer, String name) { - // TODO - return null; + + Serializer longSerializer = new LongSerializer(); + Deserializer longDeserializer = new LongDeserializer(); + + KeyValueMapper> mapper = new KeyValueMapper>() { + @Override + public KeyValue apply(K key, V value) { + K1 aggKey = keySelector.apply(key, value); + + return new KeyValue<>(aggKey, value); + } + }; + + return this.aggregate(new CountSupplier(), mapper, + keySerializer, valueSerializer, longSerializer, + keyDeserializer, valueDeserializer, longDeserializer, + name); } - @Override - public KTable count(KeyValueMapper keySelector, - Serializer keySerializer, - Deserializer keyDeserializer, - String name) { - // TODO - return null; + @SuppressWarnings("unchecked") + KTableValueGetterSupplier valueGetterSupplier() { + if (processorSupplier instanceof KTableSource) { + KTableSource source = (KTableSource) processorSupplier; + materialize(source); + return new KTableSourceValueGetterSupplier<>(source.topic); + } else { + return ((KTableProcessorSupplier) processorSupplier).view(); + } } - @Override - public > KTable> topK(int k, - KeyValueMapper keySelector, - Serializer keySerializer, - Serializer aggValueSerializer, - Deserializer keyDeserializer, - Deserializer aggValueDeserializer, - String name) { - // TODO - return null; + @SuppressWarnings("unchecked") + void enableSendingOldValues() { + if (!sendOldValues) { + if (processorSupplier instanceof KTableSource) { + KTableSource source = (KTableSource) processorSupplier; + materialize(source); + source.enableSendingOldValues(); + } else { + ((KTableProcessorSupplier) processorSupplier).enableSendingOldValues(); + } + sendOldValues = true; + } + } + + boolean sendingOldValueEnabled() { + return sendOldValues; + } + + private void materialize(KTableSource source) { + synchronized (source) { + if (!source.isMaterialized()) { + StateStoreSupplier storeSupplier = + new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); + // mark this state as non internal hence it is read directly from a user topic + topology.addStateStore(storeSupplier, false, name); + source.materialize(); + } + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index be8085562b5..c664906f185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; + class KTableMapValues implements KTableProcessorSupplier { private final KTableImpl parent; @@ -36,7 +37,7 @@ class KTableMapValues implements KTableProcessorSupplier @Override public Processor> get() { - return new KTableMapProcessor(); + return new KTableMapValuesProcessor(); } @Override @@ -67,16 +68,15 @@ class KTableMapValues implements KTableProcessorSupplier return newValue; } - private class KTableMapProcessor extends AbstractProcessor> { + private class KTableMapValuesProcessor extends AbstractProcessor> { @Override public void process(K1 key, Change change) { V2 newValue = computeValue(change.newValue); V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null; - context().forward(key, new Change(newValue, oldValue)); + context().forward(key, new Change<>(newValue, oldValue)); } - } private class KTableMapValuesValueGetter implements KTableValueGetter { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java new file mode 100644 index 00000000000..bbef7fb4bd8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; + +/** + * KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations. + * + * Given the input, it can output at most two records (one mapped from old value and one mapped from new value). + */ +public class KTableRepartitionMap implements KTableProcessorSupplier> { + + private final KTableImpl parent; + private final KeyValueMapper> mapper; + + public KTableRepartitionMap(KTableImpl parent, KeyValueMapper> mapper) { + this.parent = parent; + this.mapper = mapper; + } + + @Override + public Processor> get() { + return new KTableMapProcessor(); + } + + @Override + public KTableValueGetterSupplier> view() { + final KTableValueGetterSupplier parentValueGetterSupplier = parent.valueGetterSupplier(); + + return new KTableValueGetterSupplier>() { + + public KTableValueGetter> get() { + return new KTableMapValueGetter(parentValueGetterSupplier.get()); + } + + }; + } + + @Override + public void enableSendingOldValues() { + // this should never be called + throw new KafkaException("KTableRepartitionMap should always require sending old values."); + } + + private KeyValue computeValue(K1 key, V1 value) { + KeyValue newValue = null; + + if (key != null || value != null) + newValue = mapper.apply(key, value); + + return newValue; + } + + private class KTableMapProcessor extends AbstractProcessor> { + + @Override + public void process(K1 key, Change change) { + KeyValue newPair = computeValue(key, change.newValue); + + context().forward(newPair.key, new Change<>(newPair.value, null)); + + if (change.oldValue != null) { + KeyValue oldPair = computeValue(key, change.oldValue); + context().forward(oldPair.key, new Change<>(null, oldPair.value)); + } + } + } + + private class KTableMapValueGetter implements KTableValueGetter> { + + private final KTableValueGetter parentGetter; + + public KTableMapValueGetter(KTableValueGetter parentGetter) { + this.parentGetter = parentGetter; + } + + @Override + public void init(ProcessorContext context) { + parentGetter.init(context); + } + + @Override + public KeyValue get(K1 key) { + return computeValue(key, parentGetter.get(key)); + } + + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java new file mode 100644 index 00000000000..b66590ea7d9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; + +public class LongSumSupplier implements AggregatorSupplier { + + private class LongSum implements Aggregator { + + @Override + public Long initialValue() { + return 0L; + } + + @Override + public Long add(K aggKey, Long value, Long aggregate) { + return aggregate + value; + } + + @Override + public Long remove(K aggKey, Long value, Long aggregate) { + return aggregate - value; + } + + @Override + public Long merge(Long aggr1, Long aggr2) { + return aggr1 + aggr2; + } + } + + @Override + public Aggregator get() { + return new LongSum(); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java similarity index 92% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java index 9a14c53a642..96c36685863 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java @@ -23,13 +23,13 @@ import org.apache.kafka.streams.kstream.Windowed; import java.nio.ByteBuffer; import java.util.Map; -public class DefaultWindowedDeserializer implements Deserializer> { +public class WindowedDeserializer implements Deserializer> { private static final int TIMESTAMP_SIZE = 8; private Deserializer inner; - public DefaultWindowedDeserializer(Deserializer inner) { + public WindowedDeserializer(Deserializer inner) { this.inner = inner; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java similarity index 92% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java index 4bf2b28dcbf..4407a5b9bb2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java @@ -23,13 +23,13 @@ import org.apache.kafka.streams.kstream.Windowed; import java.nio.ByteBuffer; import java.util.Map; -public class DefaultWindowedSerializer implements Serializer> { +public class WindowedSerializer implements Serializer> { private static final int TIMESTAMP_SIZE = 8; private Serializer inner; - public DefaultWindowedSerializer(Serializer inner) { + public WindowedSerializer(Serializer inner) { this.inner = inner; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java new file mode 100644 index 00000000000..189cf9d0d82 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.NoOpKeyValueMapper; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class KTableAggregateTest { + + private final Serializer strSerializer = new StringSerializer(); + private final Deserializer strDeserializer = new StringDeserializer(); + + private class StringCanonizeSupplier implements AggregatorSupplier { + + private class StringCanonizer implements Aggregator { + + @Override + public String initialValue() { + return ""; + } + + @Override + public String add(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; + } + + @Override + public String remove(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; + } + + @Override + public String merge(String aggr1, String aggr2) { + return "(" + aggr1 + ") + (" + aggr2 + ")"; + } + } + + @Override + public Aggregator get() { + return new StringCanonizer(); + } + } + + @Test + public void testAggBasic() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KTable table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTable table2 = table1.aggregate(new StringCanonizeSupplier(), + new NoOpKeyValueMapper(), + strSerializer, + strSerializer, + strSerializer, + strDeserializer, + strDeserializer, + strDeserializer, + "topic1-Canonized"); + + MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.process(topic1, "A", "1"); + driver.process(topic1, "B", "2"); + driver.process(topic1, "A", "3"); + driver.process(topic1, "B", "4"); + driver.process(topic1, "C", "5"); + driver.process(topic1, "D", "6"); + driver.process(topic1, "B", "7"); + driver.process(topic1, "C", "8"); + + assertEquals(Utils.mkList( + "A:+1", + "B:+2", + "A:+1+3", "A:+1+3-1", + "B:+2+4", "B:+2+4-2", + "C:+5", + "D:+6", + "B:+2+4-2+7", "B:+2+4-2+7-4", + "C:+5+8", "C:+5+8-5"), proc2.processed); + + } finally { + Utils.delete(baseDir); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 037b30a9864..58f1c2a127d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -39,7 +39,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class KTableMapValuesImplTest { +public class KTableMapValuesTest { private final Serializer strSerializer = new StringSerializer(); private final Deserializer strDeserializer = new StringDeserializer(); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java new file mode 100644 index 00000000000..98aa0d46ee5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; + +public class NoOpKeyValueMapper implements KeyValueMapper> { + + @Override + public KeyValue apply(K key, V value) { + return new KeyValue<>(key, value); + } +}