KAFKA-2653: Add KStream/KTable Aggregation and KTable Join APIs

ping ymatsuda for reviews.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #730 from guozhangwang/K2653r
This commit is contained in:
Guozhang Wang 2016-01-07 17:18:33 -08:00
parent 4836e525c8
commit 40d731b871
39 changed files with 1361 additions and 108 deletions

View File

@ -45,7 +45,7 @@ public class KStreamJob {
KStreamBuilder builder = new KStreamBuilder(); KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream1 = builder.from("topic1"); KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, Integer> stream2 = KStream<String, Integer> stream2 =
stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() { stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.KafkaStreaming;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
public class KTableJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
StreamingConfig config = new StreamingConfig(props);
Serializer<String> stringSerializer = new StringSerializer();
Deserializer<String> stringDeserializer = new StringDeserializer();
KStreamBuilder builder = new KStreamBuilder();
// stream aggregate
KStream<String, Long> stream1 = builder.stream("topic1");
@SuppressWarnings("unchecked")
KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
@Override
public long apply(String key, Long value) {
return value;
}
}, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
// table aggregation
KTable<String, String> table1 = builder.table("topic2");
KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
}, new KeyValueToLongMapper<String, String>() {
@Override
public long apply(String key, String value) {
return Long.parseLong(value);
}
}, stringSerializer, stringDeserializer, "table2");
// stream-table join
KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
@Override
public Long apply(Long value1, Long value2) {
if (value2 == null)
return 0L;
else
return value1 * value2;
}
});
// table-table join
KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
@Override
public String apply(String value1, Long value2) {
if (value2 == null)
return value1 + "-null";
else if (value1 == null)
return "null-" + value2;
else
return value1 + "-" + value2;
}
});
wtable1.to("topic3");
KafkaStreaming kstream = new KafkaStreaming(builder, config);
kstream.start();
}
}

View File

@ -17,22 +17,26 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
/** public interface Aggregator<K, V, T> {
* KStreamWindowed is an abstraction of a stream of key-value pairs with a window. /**
*/ * Set the initial aggregate value
public interface KStreamWindowed<K, V> extends KStream<K, V> { */
T initialValue();
/** /**
* Creates a new stream by joining this windowed stream with the other windowed stream. * When a new record with the aggregate key is added,
* Each element arrived from either of the streams is joined with elements in a window of each other. * updating the aggregate value for this key
* The resulting values are computed by applying a joiner.
*
* @param other the other windowed stream
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream
* @return KStream
*/ */
<V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner); T add(K aggKey, V value, T aggregate);
/**
* when an old record with the aggregate key is removed,
* updating the aggregate value for this key
*/
T remove(K aggKey, V value, T aggregate);
/**
* Merge two aggregate values
*/
T merge(T aggr1, T aggr2);
} }

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface AggregatorSupplier<K, V, T> {
Aggregator<K, V, T> get();
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.HoppingWindow;
import java.util.Collection;
import java.util.Collections;
public class HoppingWindows extends Windows<HoppingWindow> {
private static final long DEFAULT_SIZE_MS = 1000L;
public final long size;
public final long period;
private HoppingWindows(String name, long size, long period) {
super(name);
this.size = size;
this.period = period;
}
/**
* Returns a half-interval hopping window definition with the window size in milliseconds
* of the form &#91; N &#42; default_size, N &#42; default_size + default_size &#41;
*/
public static HoppingWindows of(String name) {
return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS);
}
/**
* Returns a new hopping window definition with the original size but reassign the window
* period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
*/
public HoppingWindows with(long size) {
return new HoppingWindows(this.name, size, this.period);
}
/**
* Returns a new hopping window definition with the original size but reassign the window
* period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
*/
public HoppingWindows every(long period) {
return new HoppingWindows(this.name, this.size, period);
}
@Override
public Collection<HoppingWindow> windowsFor(long timestamp) {
// TODO
return Collections.<HoppingWindow>emptyList();
}
@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(HoppingWindows.class))
return false;
HoppingWindows otherWindows = (HoppingWindows) other;
return this.size == otherWindows.size && this.period == otherWindows.period;
}
}

View File

@ -17,27 +17,32 @@
package org.apache.kafka.streams.kstream; package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.SlidingWindow;
import java.util.Collection;
/** /**
* This class is used to specify the behaviour of windowed joins. * This class is used to specify the behaviour of windowed joins.
*/ */
public class JoinWindowSpec { public class JoinWindows extends Windows<SlidingWindow> {
private static final int DEFAULT_NUM_SEGMENTS = 3;
public final String name;
public final long before; public final long before;
public final long after; public final long after;
public final long retention;
public final int segments; public final int segments;
private JoinWindowSpec(String name, long before, long after, long retention, int segments) { private JoinWindows(String name, long before, long after, int segments) {
this.name = name; super(name);
this.after = after; this.after = after;
this.before = before; this.before = before;
this.retention = retention;
this.segments = segments; this.segments = segments;
} }
public static JoinWindowSpec of(String name) { public static JoinWindows of(String name) {
return new JoinWindowSpec(name, 0L, 0L, 0L, 3); return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
} }
/** /**
@ -47,8 +52,8 @@ public class JoinWindowSpec {
* @param timeDifference * @param timeDifference
* @return * @return
*/ */
public JoinWindowSpec within(long timeDifference) { public JoinWindows within(long timeDifference) {
return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments); return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
} }
/** /**
@ -59,8 +64,8 @@ public class JoinWindowSpec {
* @param timeDifference * @param timeDifference
* @return * @return
*/ */
public JoinWindowSpec before(long timeDifference) { public JoinWindows before(long timeDifference) {
return new JoinWindowSpec(name, timeDifference, 0L, retention, segments); return new JoinWindows(this.name, timeDifference, this.after, this.segments);
} }
/** /**
@ -71,21 +76,35 @@ public class JoinWindowSpec {
* @param timeDifference * @param timeDifference
* @return * @return
*/ */
public JoinWindowSpec after(long timeDifference) { public JoinWindows after(long timeDifference) {
return new JoinWindowSpec(name, 0L, timeDifference, retention, segments); return new JoinWindows(this.name, this.before, timeDifference, this.segments);
} }
/** /**
* Specifies the retention period of windows * Specifies the number of segments to be used for rolling the window store,
* @param retentionPeriod * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
*
* @param segments
* @return * @return
*/ */
public JoinWindowSpec retentionPeriod(long retentionPeriod) { protected JoinWindows segments(int segments) {
return new JoinWindowSpec(name, before, after, retentionPeriod, segments); return new JoinWindows(name, before, after, segments);
} }
public JoinWindowSpec segments(int segments) { @Override
return new JoinWindowSpec(name, before, after, retention, segments); public Collection<SlidingWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}
@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(JoinWindows.class))
return false;
JoinWindows otherWindows = (JoinWindows) other;
return this.before == otherWindows.before && this.after == otherWindows.after;
} }
} }

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Collection;
/** /**
* KStream is an abstraction of a stream of key-value pairs. * KStream is an abstraction of a stream of key-value pairs.
* *
@ -141,7 +143,7 @@ public interface KStream<K, V> {
/** /**
* Applies a stateful transformation to all elements in this stream. * Applies a stateful transformation to all elements in this stream.
* *
* @param transformerSupplier the class of TransformerDef * @param transformerSupplier the class of valueTransformerSupplier
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @return the instance of KStream that contains transformed keys and values * @return the instance of KStream that contains transformed keys and values
*/ */
@ -150,7 +152,7 @@ public interface KStream<K, V> {
/** /**
* Applies a stateful transformation to all values in this stream. * Applies a stateful transformation to all values in this stream.
* *
* @param valueTransformerSupplier the class of TransformerDef * @param valueTransformerSupplier the class of valueTransformerSupplier
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @return the instance of KStream that contains the keys and transformed values * @return the instance of KStream that contains the keys and transformed values
*/ */
@ -169,7 +171,7 @@ public interface KStream<K, V> {
* *
* @param otherStream the instance of KStream joined with this stream * @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param joinWindowSpec the specification of the join window * @param windows the specification of the join window
* @param keySerializer key serializer, * @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used * if not specified the default serializer defined in the configs will be used
* @param thisValueSerializer value serializer for this stream, * @param thisValueSerializer value serializer for this stream,
@ -188,7 +190,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> join( <V1, V2> KStream<K, V2> join(
KStream<K, V1> otherStream, KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner, ValueJoiner<V, V1, V2> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerializer, Serializer<K> keySerializer,
Serializer<V> thisValueSerializer, Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer, Serializer<V1> otherValueSerializer,
@ -201,7 +203,7 @@ public interface KStream<K, V> {
* *
* @param otherStream the instance of KStream joined with this stream * @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param joinWindowSpec the specification of the join window * @param windows the specification of the join window
* @param keySerializer key serializer, * @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used * if not specified the default serializer defined in the configs will be used
* @param thisValueSerializer value serializer for this stream, * @param thisValueSerializer value serializer for this stream,
@ -220,7 +222,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> outerJoin( <V1, V2> KStream<K, V2> outerJoin(
KStream<K, V1> otherStream, KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner, ValueJoiner<V, V1, V2> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerializer, Serializer<K> keySerializer,
Serializer<V> thisValueSerializer, Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer, Serializer<V1> otherValueSerializer,
@ -233,6 +235,7 @@ public interface KStream<K, V> {
* *
* @param otherStream the instance of KStream joined with this stream * @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param windows the specification of the join window
* @param keySerializer key serializer, * @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used * if not specified the default serializer defined in the configs will be used
* @param otherValueSerializer value serializer for other stream, * @param otherValueSerializer value serializer for other stream,
@ -247,7 +250,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> leftJoin( <V1, V2> KStream<K, V2> leftJoin(
KStream<K, V1> otherStream, KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner, ValueJoiner<V, V1, V2> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerializer, Serializer<K> keySerializer,
Serializer<V1> otherValueSerializer, Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer, Deserializer<K> keyDeserializer,
@ -258,9 +261,79 @@ public interface KStream<K, V> {
* *
* @param ktable the instance of KTable joined with this stream * @param ktable the instance of KTable joined with this stream
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param <V1> the value type of the other stream * @param <V1> the value type of the table
* @param <V2> the value type of the new stream * @param <V2> the value type of the new stream
*/ */
<V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner); <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
/**
* Aggregate values of this stream by key on a window basis.
*
* @param aggregatorSupplier the class of aggregatorSupplier
* @param windows the specification of the aggregation window
* @param <T> the value type of the aggregated table
*/
<T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer);
/**
* Sum extracted long integer values of this stream by key on a window basis.
*
* @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
* @param windows the specification of the aggregation window
*/
<W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer);
/**
* Sum extracted integer values of this stream by key on a window basis.
*
* @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
* @param windows the specification of the aggregation window
*/
<W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer);
/**
* Sum extracted double decimal values of this stream by key on a window basis.
*
* @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
* @param windows the specification of the aggregation window
*/
<W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer);
/**
* Count number of records of this stream by key on a window basis.
*
* @param windows the specification of the aggregation window
*/
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer);
/**
* Get the top-k values of this stream by key on a window basis.
*
* @param k parameter of the top-k computation
* @param valueSelector the class of KeyValueMapper to extract the comparable value
* @param windows the specification of the aggregation window
*/
<W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
KeyValueMapper<K, V, V1> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer);
} }

View File

@ -46,8 +46,8 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topics the topic names, if empty default to all the topics in the config * @param topics the topic names, if empty default to all the topics in the config
* @return KStream * @return KStream
*/ */
public <K, V> KStream<K, V> from(String... topics) { public <K, V> KStream<K, V> stream(String... topics) {
return from(null, null, topics); return stream(null, null, topics);
} }
/** /**
@ -60,7 +60,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topics the topic names, if empty default to all the topics in the config * @param topics the topic names, if empty default to all the topics in the config
* @return KStream * @return KStream
*/ */
public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) { public <K, V> KStream<K, V> stream(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
String name = newName(KStreamImpl.SOURCE_NAME); String name = newName(KStreamImpl.SOURCE_NAME);
addSource(name, keyDeserializer, valDeserializer, topics); addSource(name, keyDeserializer, valDeserializer, topics);

View File

@ -20,6 +20,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import java.util.Collection;
/** /**
* KTable is an abstraction of a change log stream. * KTable is an abstraction of a change log stream.
* *
@ -112,7 +114,7 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param <V1> the value type of the other stream * @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream * @param <V2> the value type of the new stream
* @return the instance of KStream * @return the instance of KTable
*/ */
<V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
@ -123,7 +125,7 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param <V1> the value type of the other stream * @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream * @param <V2> the value type of the new stream
* @return the instance of KStream * @return the instance of KTable
*/ */
<V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
@ -134,8 +136,90 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner * @param joiner ValueJoiner
* @param <V1> the value type of the other stream * @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream * @param <V2> the value type of the new stream
* @return the instance of KStream * @return the instance of KTable
*/ */
<V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
/**
* Aggregate values of this table by the selected key.
*
* @param aggregatorSupplier the class of AggregatorSupplier
* @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
* @param <K1> the key type of the aggregated table
* @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
<K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K> keySerializer,
Serializer<V2> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V2> aggValueDeserializer,
String name);
/**
* Sum extracted long integer values of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
* @param name the name of the resulted table
*/
<K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToLongMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Sum extracted integer values of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
* @param name the name of the resulted table
*/
<K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToIntMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Sum extracted double decimal values of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
* @param name the name of the resulted table
*/
<K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToDoubleMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Count number of records of this table by the selected aggregation key
*
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param name the name of the resulted table
*/
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name);
/**
* Get the top-k values of this table by the selected aggregation key
*
* @param k parameter of the top-k computation
* @param keySelector the class of KeyValueMapper to select the aggregation key
* @param name the name of the resulted table
*/
<K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer,
String name);
} }

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToDoubleMapper<K, V> {
double apply(K key, V value);
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToIntMapper<K, V> {
int apply(K key, V value);
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public interface KeyValueToLongMapper<K, V> {
long apply(K key, V value);
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.SlidingWindow;
import java.util.Collection;
import java.util.Collections;
public class SlidingWindows extends Windows<SlidingWindow> {
private static final long DEFAULT_SIZE_MS = 1000L;
public final long size;
private SlidingWindows(String name, long size) {
super(name);
this.size = size;
}
/**
* Returns a half-interval sliding window definition with the default window size
*/
public static SlidingWindows of(String name) {
return new SlidingWindows(name, DEFAULT_SIZE_MS);
}
/**
* Returns a half-interval sliding window definition with the window size in milliseconds
*/
public SlidingWindows with(long size) {
return new SlidingWindows(this.name, size);
}
@Override
public Collection<SlidingWindow> windowsFor(long timestamp) {
// TODO
return Collections.<SlidingWindow>emptyList();
}
@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(SlidingWindows.class))
return false;
SlidingWindows otherWindows = (SlidingWindows) other;
return this.size == otherWindows.size;
}
}

View File

@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream;
public interface TransformerSupplier<K, V, R> { public interface TransformerSupplier<K, V, R> {
Transformer<K, V, R> get(); Transformer<K, V, R> get();
} }

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import java.util.Collection;
import java.util.Collections;
public class UnlimitedWindows extends Windows<UnlimitedWindow> {
private static final long DEFAULT_START_TIMESTAMP = 0L;
public final long start;
private UnlimitedWindows(String name, long start) {
super(name);
this.start = start;
}
/**
* Returns an unlimited window definition
*/
public static UnlimitedWindows of(String name) {
return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
}
public UnlimitedWindows startOn(long start) {
return new UnlimitedWindows(this.name, start);
}
@Override
public Collection<UnlimitedWindow> windowsFor(long timestamp) {
// TODO
return Collections.<UnlimitedWindow>emptyList();
}
@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(UnlimitedWindows.class))
return false;
UnlimitedWindows otherWindows = (UnlimitedWindows) other;
return this.start == otherWindows.start;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public abstract class Window {
private long start;
private long end;
public Window(long start, long end) {
this.start = start;
this.end = end;
}
/**
* Returns the start timestamp of this window, inclusive
*/
public long start() {
return start;
}
/**
* Returns the end timestamp of this window, exclusive
*/
public long end() {
return end;
}
public boolean overlap(Window other) {
return this.start() < other.end() || other.start() < this.end();
}
public boolean equalsTo(Window other) {
return this.start() == other.start() && this.end() == other.end();
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
public class Windowed<T> {
private T value;
private Window window;
public Windowed(T value, Window window) {
this.value = value;
this.window = window;
}
public T value() {
return value;
}
public Window window() {
return window;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class Windows<W extends Window> {
private static final long DEFAULT_EMIT_DURATION = 1000L;
private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
private long emitDuration;
private long maintainDuration;
protected String name;
protected Windows(String name) {
this.name = name;
this.emitDuration = DEFAULT_EMIT_DURATION;
this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
}
public String name() {
return name;
}
/**
* Set the window emit duration in milliseconds of system time
*/
public Windows emit(long duration) {
this.emitDuration = duration;
return this;
}
/**
* Set the window maintain duration in milliseconds of system time
*/
public Windows until(long duration) {
this.maintainDuration = duration;
return this;
}
public long emitEveryMs() {
return this.emitDuration;
}
public long maintainMs() {
return this.maintainDuration;
}
protected String newName(String prefix) {
return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
}
abstract boolean equalTo(Windows other);
abstract Collection<W> windowsFor(long timestamp);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.Windowed;
import java.nio.ByteBuffer;
import java.util.Map;
public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Deserializer<T> inner;
public DefaultWindowedDeserializer(Deserializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public Windowed<T> deserialize(String topic, byte[] data) {
byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
System.arraycopy(data, 0, bytes, 0, bytes.length);
long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
// always read as unlimited window
return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
}
@Override
public void close() {
inner.close();
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Windowed;
import java.nio.ByteBuffer;
import java.util.Map;
public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Serializer<T> inner;
public DefaultWindowedSerializer(Serializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, Windowed<T> data) {
byte[] serializedKey = inner.serialize(topic, data.value());
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
buf.put(serializedKey);
buf.putLong(data.window().start());
return buf.array();
}
@Override
public void close() {
inner.close();
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Window;
public class HoppingWindow extends Window {
public HoppingWindow(long start, long end) {
super(start, end);
}
@Override
public boolean overlap(Window other) {
return super.overlap(other) && other.getClass().equals(HoppingWindow.class);
}
@Override
public boolean equalsTo(Window other) {
return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class);
}
}

View File

@ -19,10 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.JoinWindowSpec; import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@ -30,11 +34,15 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -62,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
public static final String SINK_NAME = "KSTREAM-SINK-"; public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-"; public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
@ -187,7 +197,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> valDeserializer) { Deserializer<V> valDeserializer) {
to(topic, keySerializer, valSerializer); to(topic, keySerializer, valSerializer);
return topology.from(keyDeserializer, valDeserializer, topic); return topology.stream(keyDeserializer, valDeserializer, topic);
} }
@Override @Override
@ -239,7 +249,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> join( public <V1, R> KStream<K, R> join(
KStream<K, V1> other, KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner, ValueJoiner<V, V1, R> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerialzier, Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier, Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier, Serializer<V1> otherValueSerialzier,
@ -247,7 +257,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> thisValueDeserialzier, Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) { Deserializer<V1> otherValueDeserialzier) {
return join(other, joiner, joinWindowSpec, return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier, keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false); keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
} }
@ -256,7 +266,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> outerJoin( public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other, KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner, ValueJoiner<V, V1, R> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerialzier, Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier, Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier, Serializer<V1> otherValueSerialzier,
@ -264,7 +274,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> thisValueDeserialzier, Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) { Deserializer<V1> otherValueDeserialzier) {
return join(other, joiner, joinWindowSpec, return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier, keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true); keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
} }
@ -273,7 +283,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private <V1, R> KStream<K, R> join( private <V1, R> KStream<K, R> join(
KStream<K, V1> other, KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner, ValueJoiner<V, V1, R> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerialzier, Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier, Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier, Serializer<V1> otherValueSerialzier,
@ -286,21 +296,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V> thisWindow = RocksDBWindowStoreSupplier<K, V> thisWindow =
new RocksDBWindowStoreSupplier<>( new RocksDBWindowStoreSupplier<>(
joinWindowSpec.name + "-1", windows.name() + "-this",
joinWindowSpec.before, windows.before,
joinWindowSpec.after, windows.after,
joinWindowSpec.retention, windows.maintainMs(),
joinWindowSpec.segments, windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier), new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
null); null);
RocksDBWindowStoreSupplier<K, V1> otherWindow = RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>( new RocksDBWindowStoreSupplier<>(
joinWindowSpec.name + "-2", windows.name() + "-other",
joinWindowSpec.after, windows.before,
joinWindowSpec.before, windows.after,
joinWindowSpec.retention, windows.maintainMs(),
joinWindowSpec.segments, windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
null); null);
@ -333,7 +343,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> leftJoin( public <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> other, KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner, ValueJoiner<V, V1, R> joiner,
JoinWindowSpec joinWindowSpec, JoinWindows windows,
Serializer<K> keySerialzier, Serializer<K> keySerialzier,
Serializer<V1> otherValueSerialzier, Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier, Deserializer<K> keyDeserialier,
@ -343,11 +353,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V1> otherWindow = RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>( new RocksDBWindowStoreSupplier<>(
joinWindowSpec.name, windows.name() + "-this",
joinWindowSpec.after, windows.before,
joinWindowSpec.before, windows.after,
joinWindowSpec.retention, windows.maintainMs(),
joinWindowSpec.segments, windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
null); null);
@ -376,4 +386,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(topology, name, allSourceNodes); return new KStreamImpl<>(topology, name, allSourceNodes);
} }
@Override
public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer) {
// TODO
return null;
}
@Override
public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
// TODO
return null;
}
public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
// TODO
return null;
}
public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
// TODO
return null;
}
@Override
public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer) {
// TODO
return null;
}
@Override
public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
KeyValueMapper<K, V, V1> valueSelector,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer) {
// TODO
return null;
}
} }

View File

@ -19,15 +19,22 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier;
import java.util.Collection;
import java.util.Set; import java.util.Set;
/** /**
@ -271,4 +278,66 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes); return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
} }
@Override
public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K> keySerializer,
Serializer<V2> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V2> aggValueDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToLongMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToIntMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
KeyValueToDoubleMapper<K, V> valueSelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
String name) {
// TODO
return null;
}
@Override
public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
KeyValueMapper<K, V, K1> keySelector,
Serializer<K> keySerializer,
Serializer<V1> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V1> aggValueDeserializer,
String name) {
// TODO
return null;
}
} }

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Window;
public class SlidingWindow extends Window {
public SlidingWindow(long start, long end) {
super(start, end);
}
@Override
public boolean overlap(Window other) {
return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
}
@Override
public boolean equalsTo(Window other) {
return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.Aggregator;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
/**
* NOTE: This is just a demo aggregate supplier that can be implemented by users to add their own built-in aggregates.
* It is highly in-efficient and is not supposed to be merged in.
*/
public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K, V, Collection<V>> {
private final int k;
public TopKSupplier(int k) {
this.k = k;
}
private class TopK implements Aggregator<K, V, Collection<V>> {
private final Map<K, PriorityQueue<V>> sorted = new HashMap<>();
@Override
public Collection<V> initialValue() {
return Collections.<V>emptySet();
}
@Override
public Collection<V> add(K aggKey, V value, Collection<V> aggregate) {
PriorityQueue<V> queue = sorted.get(aggKey);
if (queue == null) {
queue = new PriorityQueue<>();
sorted.put(aggKey, queue);
}
queue.add(value);
PriorityQueue<V> copy = new PriorityQueue<>(queue);
Set<V> ret = new HashSet<>();
for (int i = 1; i <= k; i++)
ret.add(copy.poll());
return ret;
}
@Override
public Collection<V> remove(K aggKey, V value, Collection<V> aggregate) {
PriorityQueue<V> queue = sorted.get(aggKey);
if (queue == null)
throw new IllegalStateException("This should not happen.");
queue.remove(value);
PriorityQueue<V> copy = new PriorityQueue<>(queue);
Set<V> ret = new HashSet<>();
for (int i = 1; i <= k; i++)
ret.add(copy.poll());
return ret;
}
@Override
public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2) {
PriorityQueue<V> copy = new PriorityQueue<>(aggr1);
copy.addAll(aggr2);
Set<V> ret = new HashSet<>();
for (int i = 1; i <= k; i++)
ret.add(copy.poll());
return ret;
}
}
@Override
public Aggregator<K, V, Collection<V>> get() {
return new TopK();
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Window;
public class UnlimitedWindow extends Window {
public UnlimitedWindow(long start) {
super(start, Long.MAX_VALUE);
}
@Override
public boolean overlap(Window other) {
return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class);
}
@Override
public boolean equalsTo(Window other) {
return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class);
}
}

View File

@ -32,7 +32,7 @@ public class KStreamBuilderTest {
public void testFrom() { public void testFrom() {
final KStreamBuilder builder = new KStreamBuilder(); final KStreamBuilder builder = new KStreamBuilder();
builder.from("topic-1", "topic-2"); builder.stream("topic-1", "topic-2");
builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3"); builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
} }
@ -59,8 +59,8 @@ public class KStreamBuilderTest {
KStreamBuilder builder = new KStreamBuilder(); KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source1 = builder.from(topic1); KStream<String, String> source1 = builder.stream(topic1);
KStream<String, String> source2 = builder.from(topic2); KStream<String, String> source2 = builder.stream(topic2);
KStream<String, String> merged = builder.merge(source1, source2); KStream<String, String> merged = builder.merge(source1, source2);
MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();

View File

@ -67,7 +67,7 @@ public class KStreamBranchTest {
KStream<Integer, String>[] branches; KStream<Integer, String>[] branches;
MockProcessorSupplier<Integer, String>[] processors; MockProcessorSupplier<Integer, String>[] processors;
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
branches = stream.branch(isEven, isMultipleOfThree, isOdd); branches = stream.branch(isEven, isMultipleOfThree, isOdd);
assertEquals(3, branches.length); assertEquals(3, branches.length);

View File

@ -52,7 +52,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.filter(isMultipleOfThree).process(processor); stream.filter(isMultipleOfThree).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);
@ -72,7 +72,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.filterOut(isMultipleOfThree).process(processor); stream.filterOut(isMultipleOfThree).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -60,7 +60,7 @@ public class KStreamFlatMapTest {
MockProcessorSupplier<String, String> processor; MockProcessorSupplier<String, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.flatMap(mapper).process(processor); stream.flatMap(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -58,7 +58,7 @@ public class KStreamFlatMapValuesTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.flatMapValues(mapper).process(processor); stream.flatMapValues(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -18,10 +18,16 @@
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test; import org.junit.Test;
@ -35,12 +41,16 @@ public class KStreamImplTest {
@Test @Test
public void testNumProcesses() { public void testNumProcesses() {
final Deserializer<String> deserializer = new StringDeserializer(); final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Integer> integerSerializer = new IntegerSerializer();
final Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
final KStreamBuilder builder = new KStreamBuilder(); final KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source1 = builder.from(deserializer, deserializer, "topic-1", "topic-2"); KStream<String, String> source1 = builder.stream(stringDeserializer, stringDeserializer, "topic-1", "topic-2");
KStream<String, String> source2 = builder.from(deserializer, deserializer, "topic-3", "topic-4"); KStream<String, String> source2 = builder.stream(stringDeserializer, stringDeserializer, "topic-3", "topic-4");
KStream<String, String> stream1 = KStream<String, String> stream1 =
source1.filter(new Predicate<String, String>() { source1.filter(new Predicate<String, String>() {
@ -99,7 +109,21 @@ public class KStreamImplTest {
} }
); );
streams2[0].to("topic-5"); KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
}, JoinWindows.of("join-0"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
}, JoinWindows.of("join-1"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
stream4.to("topic-5");
streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>()); streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>());
@ -109,6 +133,7 @@ public class KStreamImplTest {
1 + // stream3 1 + // stream3
1 + 2 + // streams2 1 + 2 + // streams2
1 + 2 + // streams3 1 + 2 + // streams3
5 * 2 + // stream2-stream3 joins
1 + // to 1 + // to
2 + // through 2 + // through
1, // process 1, // process

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindowSpec; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
@ -71,9 +71,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
stream2 = builder.from(keyDeserializer, valDeserializer, topic2); stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100), joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
joined.process(processor); joined.process(processor);
@ -177,9 +177,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
stream2 = builder.from(keyDeserializer, valDeserializer, topic2); stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
joined = stream1.outerJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100),
keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
joined.process(processor); joined.process(processor);
@ -285,9 +285,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
stream2 = builder.from(keyDeserializer, valDeserializer, topic2); stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100), joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
joined.process(processor); joined.process(processor);

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindowSpec; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
@ -71,9 +71,9 @@ public class KStreamKStreamLeftJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
stream2 = builder.from(keyDeserializer, valDeserializer, topic2); stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
keySerializer, valSerializer, keyDeserializer, valDeserializer); keySerializer, valSerializer, keyDeserializer, valDeserializer);
joined.process(processor); joined.process(processor);
@ -157,9 +157,9 @@ public class KStreamKStreamLeftJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
stream2 = builder.from(keyDeserializer, valDeserializer, topic2); stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
keySerializer, valSerializer, keyDeserializer, valDeserializer); keySerializer, valSerializer, keyDeserializer, valDeserializer);
joined.process(processor); joined.process(processor);

View File

@ -81,7 +81,7 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topic1); stream = builder.stream(keyDeserializer, valDeserializer, topic1);
table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
stream.leftJoin(table, joiner).process(processor); stream.leftJoin(table, joiner).process(processor);
@ -162,7 +162,7 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor; MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper); stream = builder.stream(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
stream.leftJoin(table, joiner).process(processor); stream.leftJoin(table, joiner).process(processor);

View File

@ -54,7 +54,7 @@ public class KStreamMapTest {
MockProcessorSupplier<String, Integer> processor; MockProcessorSupplier<String, Integer> processor;
processor = new MockProcessorSupplier<>(); processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.map(mapper).process(processor); stream.map(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -51,7 +51,7 @@ public class KStreamMapValuesTest {
KStream<Integer, String> stream; KStream<Integer, String> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.mapValues(mapper).process(processor); stream.mapValues(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -73,7 +73,7 @@ public class KStreamTransformTest {
KStream<Integer, Integer> stream; KStream<Integer, Integer> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.transform(transformerSupplier).process(processor); stream.transform(transformerSupplier).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);

View File

@ -72,7 +72,7 @@ public class KStreamTransformValuesTest {
KStream<Integer, Integer> stream; KStream<Integer, Integer> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.from(keyDeserializer, valDeserializer, topicName); stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.transformValues(valueTransformerSupplier).process(processor); stream.transformValues(valueTransformerSupplier).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder); KStreamTestDriver driver = new KStreamTestDriver(builder);