KAFKA-5832; add Consumed and change StreamBuilder to use it

Added `Consumed` class.
Updated `StreamBuilder#stream`, `StreamBuilder#table`, `StreamBuilder#globalTable`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3784 from dguy/kip-182-stream-builder
This commit is contained in:
Damian Guy 2017-09-08 08:21:48 +01:00
parent 27336192ff
commit d0ee6ed36b
47 changed files with 576 additions and 524 deletions

View File

@ -547,9 +547,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, Long&gt; wordCounts = builder.stream(
Serdes.String(), /* key serde */
Serdes.Long(), /* value serde */
"word-counts-input-topic" /* input topic */);
"word-counts-input-topic" /* input topic */,
Consumed.with(Serdes.String(), Serdes.Long()); // define key and value serdes
</pre>
When to provide serdes explicitly:
<ul>
@ -2427,7 +2426,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
StreamsConfig config = new StreamsConfig(props);
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde, "word-count-input");
KStream&lt;String, String&gt; textLines = builder.stream("word-count-input", Consumed.with(stringSerde, stringSerde);
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -142,7 +143,7 @@ public class PageViewTypedDemo {
pageViewByRegionDeserializer.configure(serdeProps, false);
final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde));
KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
"streams-userprofile-input", "streams-userprofile-store-name");

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -72,7 +73,7 @@ public class PageViewUntypedDemo {
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> views = builder.stream(Serdes.String(), jsonSerde, "streams-pageview-input");
KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde));
KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
"streams-userprofile-input", "streams-userprofile-store-name");

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.TimestampExtractor;
/**
* The {@code Consumed} class is used to define the optional parameters when using {@link StreamsBuilder} to
* build instancs of {@link KStream}, {@link KTable}, and {@link GlobalKTable}.
*
* @param <K>
* @param <V>
*/
public class Consumed<K, V> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
protected Topology.AutoOffsetReset resetPolicy;
private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
this.resetPolicy = resetPolicy;
}
/**
* Create an instance of {@link Consumed} from an existing instance.
* @param consumed the instance of {@link Consumed} to copy
*/
public Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy);
}
/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
* @param keySerde the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy);
}
/**
* Create an instance of {@link Consumed} with key and value {@link Serde}s.
*
* @param keySerde the key serde. If {@code null}the default key serde from config will be used
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Consumed<>(keySerde, valueSerde, null, null);
}
/**
* Create an instance of {@link Consumed} with a {@link TimestampExtractor}.
*
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
return new Consumed<>(null, null, timestampExtractor, null);
}
/**
* Create an instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy);
}
/**
* Configure the instance of {@link Consumed} with a key {@link Serde}.
*
* @param keySerde the key serde. If {@code null}the default key serde from config will be used
* @return this
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
this.keySerde = keySerde;
return this;
}
/**
* Configure the instance of {@link Consumed} with a value {@link Serde}.
*
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @return this
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
this.valueSerde = valueSerde;
return this;
}
/**
* Configure the instance of {@link Consumed} with a {@link TimestampExtractor}.
*
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @return this
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
this.timestampExtractor = timestampExtractor;
return this;
}
/**
* Configure the instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @return this
*/
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
this.resetPolicy = resetPolicy;
return this;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
@ -35,6 +36,8 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.Collection;
import java.util.Collections;
import java.util.regex.Pattern;
/**
@ -66,17 +69,35 @@ public class StreamsBuilder {
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param topics the topic names; must contain at least one topic name
* @param topic the topic name; cannot be {@code null}
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final String... topics) {
return internalStreamsBuilder.stream(null, null, null, null, topics);
public synchronized <K, V> KStream<K, V> stream(final String topic) {
return stream(Collections.singleton(topic));
}
/**
* Create a {@link KStream} from the specified topics.
* The default {@link TimestampExtractor} and default key and value deserializers as specified in the
* {@link StreamsConfig config} are used.
* The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
* are defined by the options in {@link Consumed}.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param topic the topic names; cannot be {@code null}
* @param consumed the instance of {@link Consumed} used to define optional parameters
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final String topic,
final Consumed<K, V> consumed) {
return stream(Collections.singleton(topic), consumed);
}
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
* deserializers as specified in the {@link StreamsConfig config} are used.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
@ -84,16 +105,34 @@ public class StreamsBuilder {
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
* offsets are available
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final String... topics) {
return internalStreamsBuilder.stream(offsetReset, null, null, null, topics);
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
return stream(topics, Consumed.<K, V>with(null, null, null, null));
}
/**
* Create a {@link KStream} from the specified topics.
* The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
* are defined by the options in {@link Consumed}.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param topics the topic names; must contain at least one topic name
* @param consumed the instance of {@link Consumed} used to define optional parameters
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics,
final Consumed<K, V> consumed) {
return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed));
}
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@ -110,13 +149,13 @@ public class StreamsBuilder {
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
return internalStreamsBuilder.stream(null, null, null, null, topicPattern);
return stream(topicPattern, Consumed.<K, V>with(null, null));
}
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@link TimestampExtractor} and default key and value deserializers as specified in the
* {@link StreamsConfig config} are used.
* The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
* are defined by the options in {@link Consumed}.
* <p>
* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
* them and there is no ordering guarantee between records from different topics.
@ -125,226 +164,13 @@ public class StreamsBuilder {
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
* offsets are available
* @param topicPattern the pattern to match for topic names
* @param consumed the instance of {@link Consumed} used to define optional parameters
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final Pattern topicPattern) {
return internalStreamsBuilder.stream(offsetReset, null, null, null, topicPattern);
}
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
* {@link StreamsConfig config} are used.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
final Serde<V> valueSerde,
final String... topics) {
return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topics);
}
/**
* Create a {@link KStream} from the specified topics.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
* offsets are available
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String... topics) {
return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topics);
}
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to read this source {@link KStream}, if not specified the default
* serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String... topics) {
return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topics);
}
/**
* Create a {@link KStream} from the specified topics.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics
* if no valid committed offsets are available
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String... topics) {
return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topics);
}
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
* as specified in the {@link StreamsConfig config} are used.
* <p>
* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
* them and there is no ordering guarantee between records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topicPattern the pattern to match for topic names
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Pattern topicPattern) {
return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topicPattern);
}
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
* them and there is no ordering guarantee between records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
* offsets are available
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topicPattern the pattern to match for topic names
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Pattern topicPattern) {
return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topicPattern);
}
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* <p>
* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
* them and there is no ordering guarantee between records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topicPattern the pattern to match for topic names
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Pattern topicPattern) {
return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topicPattern);
}
/**
* Create a {@link KStream} from the specified topic pattern.
* <p>
* If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
* them and there is no ordering guarantee between records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the data before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
* committed offsets are available
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valueSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topicPattern the pattern to match for topic names
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Pattern topicPattern) {
return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topicPattern);
public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
final Consumed<K, V> consumed) {
return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed));
}
/**
@ -378,7 +204,7 @@ public class StreamsBuilder {
*/
public synchronized <K, V> KTable<K, V> table(final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(null, null, null, null, topic, queryableStoreName);
return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName);
}
/**
@ -433,7 +259,30 @@ public class StreamsBuilder {
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic) {
return internalStreamsBuilder.table(null, null, null, null, topic, (String) null);
return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null);
}
/**
* Create a {@link KTable} for the specified topic.
* The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
* are defined by the options in {@link Consumed}.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
*
* @param topic the topic name; cannot be {@code null}
* @param consumed the instance of {@link Consumed} used to define optional parameters
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> consumed) {
return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null);
}
/**
@ -464,13 +313,13 @@ public class StreamsBuilder {
* offsets are available
* @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; if {@code null} this is the equivalent of
* {@link #table(Topology.AutoOffsetReset, String)}
* {@link #table(String, Consumed)}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(offsetReset, null, null, null, topic, queryableStoreName);
return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName);
}
/**
@ -510,29 +359,6 @@ public class StreamsBuilder {
return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
}
/**
* Create a {@link KTable} for the specified topic.
* The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
* @param topic the topic name; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
final String topic) {
return internalStreamsBuilder.table(offsetReset, null, null, null, topic, (String) null);
}
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy and default key and value deserializers
@ -567,7 +393,7 @@ public class StreamsBuilder {
public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(null, timestampExtractor, null, null, topic, queryableStoreName);
return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName);
}
/**
@ -606,7 +432,8 @@ public class StreamsBuilder {
final TimestampExtractor timestampExtractor,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(offsetReset, timestampExtractor, null, null, topic, queryableStoreName);
final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor);
return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName);
}
/**
@ -646,7 +473,9 @@ public class StreamsBuilder {
final Serde<V> valueSerde,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, queryableStoreName);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(keySerde, valueSerde, null, null),
queryableStoreName);
}
/**
@ -689,32 +518,6 @@ public class StreamsBuilder {
return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
}
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valueSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
final Serde<V> valueSerde,
final String topic) {
return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, (String) null);
}
/**
* Create a {@link KTable} for the specified topic.
* Input {@link KeyValue records} with {@code null} key will be dropped.
@ -753,7 +556,8 @@ public class StreamsBuilder {
final Serde<V> valueSerde,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, queryableStoreName);
final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset);
return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
}
/**
@ -795,7 +599,8 @@ public class StreamsBuilder {
final Serde<V> valueSerde,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(null, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null);
return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
}
/**
@ -839,36 +644,9 @@ public class StreamsBuilder {
final Serde<V> valueSerde,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
}
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valueSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String topic) {
return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, (String) null);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset),
queryableStoreName);
}
/**
@ -942,7 +720,28 @@ public class StreamsBuilder {
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.globalTable(null, null, null, topic, queryableStoreName);
return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName);
}
/**
* Create a {@link GlobalKTable} for the specified topic.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
* @param topic the topic name; cannot be {@code null}
* @param consumed the instance of {@link Consumed} used to define optional parameters
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final Consumed<K, V> consumed) {
return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null);
}
/**
@ -962,7 +761,7 @@ public class StreamsBuilder {
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
return internalStreamsBuilder.globalTable(null, null, null, topic, null);
return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null);
}
/**
@ -1002,7 +801,9 @@ public class StreamsBuilder {
final TimestampExtractor timestampExtractor,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.globalTable(keySerde, valueSerde, timestampExtractor, topic, queryableStoreName);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null),
queryableStoreName);
}
/**
@ -1074,33 +875,9 @@ public class StreamsBuilder {
final Serde<V> valueSerde,
final String topic,
final String queryableStoreName) {
return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, queryableStoreName);
}
/**
* Create a {@link GlobalKTable} for the specified topic.
* The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valueSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valueSerde,
final String topic) {
return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, null);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)),
queryableStoreName);
}
/**

View File

@ -37,7 +37,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
* For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
* in the stream.
* <p>
* A {@code KStream} is either {@link StreamsBuilder#stream(String...) defined from one or multiple Kafka topics} that
* A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple Kafka topics} that
* are consumed message by message or the result of a {@code KStream} transformation.
* A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
* <p>
@ -52,7 +52,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
* @param <V> Type of values
* @see KTable
* @see KGroupedStream
* @see StreamsBuilder#stream(String...)
* @see StreamsBuilder#stream(String)
*/
@InterfaceStability.Evolving
public interface KStream<K, V> {
@ -654,7 +654,7 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
* {@link org.apache.kafka.streams.StreamsBuilder#stream(String...)
* {@link StreamsBuilder#stream(String)
* StreamsBuilder#stream(someTopicName)}.
*
* @param topic the topic name
@ -669,7 +669,7 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(StreamPartitioner, someTopicName)} and
* {@link StreamsBuilder#stream(String...) StreamsBuilder#stream(someTopicName)}.
* {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class ConsumedInternal<K, V> extends Consumed<K, V> {
public ConsumedInternal(final Consumed<K, V> consumed) {
super(consumed);
}
public ConsumedInternal(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset offsetReset) {
this(Consumed.with(keySerde, valSerde, timestampExtractor, offsetReset));
}
public ConsumedInternal() {
this(Consumed.<K, V>with(null, null));
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
}
public TimestampExtractor timestampExtractor() {
return timestampExtractor;
}
public Topology.AutoOffsetReset offsetResetPolicy() {
return resetPolicy;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,45 +45,45 @@ public class InternalStreamsBuilder {
this.internalTopologyBuilder = internalTopologyBuilder;
}
public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String... topics) {
public <K, V> KStream<K, V> stream(final Collection<String> topics,
final ConsumedInternal<K, V> consumed) {
final String name = newName(KStreamImpl.SOURCE_NAME);
internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
topics.toArray(new String[topics.size()]));
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Pattern topicPattern) {
public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
final String name = newName(KStreamImpl.SOURCE_NAME);
internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
topicPattern);
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
@SuppressWarnings("unchecked")
public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
keySerde,
valSerde,
consumed.keySerde(),
consumed.valueSerde(),
false,
Collections.<String, String>emptyMap(),
true);
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
return doTable(consumed, topic, storeSupplier, queryableStoreName != null);
}
public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
@ -92,13 +93,10 @@ public class InternalStreamsBuilder {
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true);
}
private <K, V> KTable<K, V> doTable(final Topology.AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
@ -106,13 +104,16 @@ public class InternalStreamsBuilder {
final String name = newName(KTableImpl.SOURCE_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
internalTopologyBuilder.addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
valSerde == null ? null : valSerde.deserializer(),
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
source,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
topic);
internalTopologyBuilder.addProcessor(name, processorSupplier, source);
final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable);
internalTopologyBuilder.addStateStore(storeSupplier, name);
internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
@ -120,15 +121,13 @@ public class InternalStreamsBuilder {
return kTable;
}
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final ConsumedInternal<K, V> consumed,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
keySerde,
valSerde,
return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
consumed.keySerde(),
consumed.valueSerde(),
false,
Collections.<String, String>emptyMap(),
true));
@ -138,13 +137,11 @@ public class InternalStreamsBuilder {
final Serde<V> valSerde,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier);
}
@SuppressWarnings("unchecked")
private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@ -153,10 +150,17 @@ public class InternalStreamsBuilder {
final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
internalTopologyBuilder.addGlobalStore(storeSupplier,
sourceName,
consumed.timestampExtractor(),
keyDeserializer,
valueDeserializer,
topic,
processorName,
tableSource);
return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
}
@ -199,5 +203,4 @@ public class InternalStreamsBuilder {
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
valueDeserializer, topic, processorName, stateUpdateSupplier);
}
}

View File

@ -380,14 +380,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner, String topic) {
return through(topic, Produced.with(keySerde, valSerde, partitioner));
}
@Override
public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
to(topic, produced);
return builder.stream(null, new FailOnInvalidTimestamp(), produced.keySerde(), produced.valueSerde(), topic);
return builder.stream(Collections.singleton(topic),
new ConsumedInternal<>(produced.keySerde(),
produced.valueSerde(),
new FailOnInvalidTimestamp(),
null));
}
@Override

View File

@ -340,7 +340,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
to(keySerde, valSerde, partitioner, topic);
return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, internalStoreName);
return builder.table(topic,
new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
internalStoreName);
}
@Override

View File

@ -374,7 +374,7 @@ public class KafkaStreamsTest {
final String topic = "input";
CLUSTER.createTopic(topic);
builder.stream(Serdes.String(), Serdes.String(), topic)
builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.foreach(new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
@ -27,7 +26,9 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@ -41,7 +42,7 @@ public class StreamsBuilderTest {
@Test(expected = TopologyException.class)
public void testFrom() {
builder.stream("topic-1", "topic-2");
builder.stream(Arrays.asList("topic-1", "topic-2"));
builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
}
@ -109,12 +110,12 @@ public class StreamsBuilderTest {
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
builder.stream();
builder.stream(Collections.<String>emptyList());
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
builder.stream(Serdes.String(), Serdes.String(), null, null);
builder.stream(Arrays.<String>asList(null, null));
}
// TODO: these two static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder,

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -101,7 +102,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long()));
table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
foreachAction = new ForeachAction<String, String>() {
@Override

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -100,7 +101,7 @@ public class KStreamAggregationDedupIntegrationTest {
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
groupedStream = stream
.groupBy(
mapper,

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -37,6 +38,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
@ -111,12 +113,11 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
groupedStream = stream
.groupBy(
mapper,
Serdes.String(),
Serdes.String());
Serialized.with(Serdes.String(), Serdes.String()));
reducer = new Reducer<String>() {
@Override
@ -208,7 +209,7 @@ public class KStreamAggregationIntegrationTest {
return windowedKey.key() + "@" + windowedKey.window().start();
}
})
.to(Serdes.String(), Serdes.String(), outputTopic);
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
startStreams();
@ -513,7 +514,8 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, Long> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
.toStream()
@ -536,6 +538,7 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L));
}
@SuppressWarnings("deprecation")
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
@ -600,7 +603,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, String> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
final String userSessionsStore = "UserSessionsStore";
builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.reduce(new Reducer<String>() {
@Override

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -197,7 +198,7 @@ public class KStreamKTableJoinIntegrationTest {
//
// Because this is a KStream ("record stream"), multiple records for the same user will be
// considered as separate click-count events, each of which will be added to the total count.
final KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, userClicksTopic);
final KStream<String, Long> userClicksStream = builder.stream(userClicksTopic, Consumed.with(Serdes.String(), Serdes.Long()));
// This KTable contains information such as "alice" -> "europe".
//
// Because this is a KTable ("changelog stream"), only the latest value (here: region) for a

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -100,9 +101,9 @@ public class KStreamRepartitionJoinTest {
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
streamOne = builder.stream(streamOneInput, Consumed.with(Serdes.Long(), Serdes.Integer()));
streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String()));
streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String()));
keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -186,9 +187,10 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
final KStream<String, String> pattern2Stream = builder.stream(Topology.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.EARLIEST));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.LATEST));
final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ));
pattern1Stream.to(stringSerde, stringSerde, outputTopic);
pattern2Stream.to(stringSerde, stringSerde, outputTopic);
@ -262,10 +264,9 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
public void shouldThrowExceptionOverlappingTopic() throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
try {
builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), Consumed.with(Topology.AutoOffsetReset.LATEST));
fail("Should have thrown TopologyException");
} catch (final TopologyException expected) {
// do nothing

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyValue;
@ -191,7 +192,7 @@ public class QueryableStateIntegrationTest {
private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<String> stringSerde = Serdes.String();
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
final KStream<String, String> textLines = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
final KGroupedStream<String, String> groupedByWord = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {

View File

@ -279,7 +279,7 @@ public class RegexSourceIntegrationTest {
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
@ -44,7 +45,7 @@ public class AbstractStreamTest {
final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
final String topicName = "topic";
ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(Serdes.Integer(), Serdes.String(), topicName));
ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
stream.randomFilter().process(processor);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
@ -54,7 +55,7 @@ public class GlobalKTableJoinsTest {
public void setUp() throws Exception {
stateDir = TestUtils.tempDirectory();
global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
keyValueMapper = new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String value) {

View File

@ -16,9 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@ -58,6 +57,7 @@ public class InternalStreamsBuilderTest {
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
private KStreamTestDriver driver = null;
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
@Before
public void setUp() {
@ -103,9 +103,9 @@ public class InternalStreamsBuilderTest {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
final String topic3 = "topic-3";
final KStream<String, String> source1 = builder.stream(null, null, null, null, topic1);
final KStream<String, String> source2 = builder.stream(null, null, null, null, topic2);
final KStream<String, String> source3 = builder.stream(null, null, null, null, topic3);
final KStream<String, String> source1 = builder.stream(Collections.singleton(topic1), consumed);
final KStream<String, String> source2 = builder.stream(Collections.singleton(topic2), consumed);
final KStream<String, String> source3 = builder.stream(Collections.singleton(topic3), consumed);
final KStream<String, String> processedSource1 =
source1.mapValues(new ValueMapper<String, String>() {
@Override
@ -133,8 +133,8 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
KTable table1 = builder.table(null, null, null, null, "topic1", "table1");
KTable table2 = builder.table(null, null, null, null, "topic2", (String) null);
KTable table1 = builder.table("topic1", consumed, "table1");
KTable table2 = builder.table("topic2", consumed, null);
final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
@ -152,7 +152,7 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldBuildSimpleGlobalTableTopology() throws Exception {
builder.globalTable(null, null, null, "table", "globalTable");
builder.globalTable("table", consumed, "globalTable");
final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
final List<StateStore> stateStores = topology.globalStateStores();
@ -173,16 +173,16 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
builder.globalTable(null, null, null, "table", "globalTable");
builder.globalTable(null, null, null, "table2", "globalTable2");
builder.globalTable("table", consumed, "globalTable");
builder.globalTable("table2", consumed, "globalTable2");
doBuildGlobalTopologyWithAllGlobalTables();
}
@Test
public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
builder.globalTable(null, null, null, "table", null);
builder.globalTable(null, null, null, "table2", null);
builder.globalTable("table", consumed, null);
builder.globalTable("table2", consumed, null);
doBuildGlobalTopologyWithAllGlobalTables();
}
@ -191,10 +191,10 @@ public class InternalStreamsBuilderTest {
public void shouldAddGlobalTablesToEachGroup() throws Exception {
final String one = "globalTable";
final String two = "globalTable2";
final GlobalKTable<String, String> globalTable = builder.globalTable(null, null, null, "table", one);
final GlobalKTable<String, String> globalTable2 = builder.globalTable(null, null, null, "table2", two);
final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one);
final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two);
builder.table(null, null, null, null, "not-global", "not-global");
builder.table("not-global", consumed, "not-global");
final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
@Override
@ -203,9 +203,9 @@ public class InternalStreamsBuilderTest {
}
};
final KStream<String, String> stream = builder.stream(null, null, null, null, "t1");
final KStream<String, String> stream = builder.stream(Collections.singleton("t1"), consumed);
stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
final KStream<String, String> stream2 = builder.stream(null, null, null, null, "t2");
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t2"), consumed);
stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
final Map<Integer, Set<String>> nodeGroups = builder.internalTopologyBuilder.nodeGroups();
@ -225,9 +225,9 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
final KStream<String, String> playEvents = builder.stream(null, null, null, null, "events");
final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
final KTable<String, String> table = builder.table(null, null, null, null, "table-topic", "table-store");
final KTable<String, String> table = builder.table("table-topic", consumed, "table-store");
assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
@ -239,8 +239,8 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldAddTopicToEarliestAutoOffsetResetList() {
final String topicName = "topic-1";
builder.stream(AutoOffsetReset.EARLIEST, null, null, null, topicName);
final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST));
builder.stream(Collections.singleton(topicName), consumed);
assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@ -250,7 +250,8 @@ public class InternalStreamsBuilderTest {
public void shouldAddTopicToLatestAutoOffsetResetList() {
final String topicName = "topic-1";
builder.stream(AutoOffsetReset.LATEST, null, null, null, topicName);
final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST));
builder.stream(Collections.singleton(topicName), consumed);
assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@ -260,8 +261,7 @@ public class InternalStreamsBuilderTest {
public void shouldAddTableToEarliestAutoOffsetResetList() {
final String topicName = "topic-1";
final String storeName = "test-store";
builder.table(AutoOffsetReset.EARLIEST, null, null, null, topicName, storeName);
builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName);
assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@ -272,7 +272,7 @@ public class InternalStreamsBuilderTest {
final String topicName = "topic-1";
final String storeName = "test-store";
builder.table(AutoOffsetReset.LATEST, null, null, null, topicName, storeName);
builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName);
assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@ -282,9 +282,8 @@ public class InternalStreamsBuilderTest {
public void shouldNotAddTableToOffsetResetLists() {
final String topicName = "topic-1";
final String storeName = "test-store";
final Serde<String> stringSerde = Serdes.String();
builder.table(null, null, stringSerde, stringSerde, topicName, storeName);
builder.table(topicName, consumed, storeName);
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@ -295,7 +294,7 @@ public class InternalStreamsBuilderTest {
final Pattern topicPattern = Pattern.compile("topic-\\d");
final String topic = "topic-5";
builder.stream(null, null, null, null, topicPattern);
builder.stream(topicPattern, consumed);
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topic).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topic).matches());
@ -307,7 +306,7 @@ public class InternalStreamsBuilderTest {
final Pattern topicPattern = Pattern.compile("topic-\\d+");
final String topicTwo = "topic-500000";
builder.stream(AutoOffsetReset.EARLIEST, null, null, null, topicPattern);
builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)));
assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
@ -318,7 +317,7 @@ public class InternalStreamsBuilderTest {
final Pattern topicPattern = Pattern.compile("topic-\\d+");
final String topicTwo = "topic-1000000";
builder.stream(AutoOffsetReset.LATEST, null, null, null, topicPattern);
builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)));
assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
@ -326,28 +325,30 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
builder.stream(null, null, null, null, "topic");
builder.stream(Collections.singleton("topic"), consumed);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
public void shouldUseProvidedTimestampExtractor() throws Exception {
builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
builder.stream(Collections.singleton("topic"), consumed);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
@Test
public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
builder.table(null, null, null, null, "topic", "store");
builder.table("topic", consumed, "store");
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
builder.table("topic", consumed, "store");
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
@ -68,7 +69,7 @@ public class KGroupedStreamImplTest {
@Before
public void before() {
final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), TOPIC);
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
}
@ -87,17 +88,20 @@ public class KGroupedStreamImplTest {
groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>) null);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
groupedStream.count((StateStoreSupplier<KeyValueStore>) null);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
@ -168,6 +172,7 @@ public class KGroupedStreamImplTest {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
@ -383,6 +388,7 @@ public class KGroupedStreamImplTest {
groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception {
groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>) null);
@ -448,6 +454,7 @@ public class KGroupedStreamImplTest {
}, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@ -473,6 +480,7 @@ public class KGroupedStreamImplTest {
groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws Exception {
groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
@ -66,7 +67,7 @@ public class KStreamBranchTest {
KStream<Integer, String>[] branches;
MockProcessorSupplier<Integer, String>[] processors;
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
branches = stream.branch(isEven, isMultipleOfThree, isOdd);
assertEquals(3, branches.length);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
@ -50,7 +51,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filter(isMultipleOfThree).process(processor);
driver.setUp(builder);
@ -70,7 +71,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filterNot(isMultipleOfThree).process(processor);
driver.setUp(builder);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
@ -59,7 +60,7 @@ public class KStreamFlatMapTest {
MockProcessorSupplier<String, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.flatMap(mapper).process(processor);
driver.setUp(builder);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -57,7 +58,7 @@ public class KStreamFlatMapValuesTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(Serdes.Integer(), Serdes.Integer(), topicName);
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.flatMapValues(mapper).process(processor);
driver.setUp(builder);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -70,7 +71,7 @@ public class KStreamForeachTest {
// When
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
stream.foreach(action);
// Then

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
@ -44,6 +45,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -60,6 +62,7 @@ public class KStreamImplTest {
final private Serde<Integer> intSerde = Serdes.Integer();
private KStream<String, String> testStream;
private StreamsBuilder builder;
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
@ -74,9 +77,9 @@ public class KStreamImplTest {
public void testNumProcesses() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
KStream<String, String> source2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
KStream<String, String> stream1 =
source1.filter(new Predicate<String, String>() {
@ -171,8 +174,9 @@ public class KStreamImplTest {
@Test
public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
KStream<String, String> stream2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
stream1.to("topic-5");
stream2.through("topic-6");
@ -189,7 +193,7 @@ public class KStreamImplTest {
public void shouldSendDataThroughTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
final KStream<String, String> stream = builder.stream(input, consumed);
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
@ -202,10 +206,10 @@ public class KStreamImplTest {
public void shouldSendDataToTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
final KStream<String, String> stream = builder.stream(input, consumed);
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
stream.to("to-topic", Produced.with(stringSerde, stringSerde));
builder.stream(stringSerde, stringSerde, "to-topic").process(processorSupplier);
builder.stream("to-topic", consumed).process(processorSupplier);
driver.setUp(builder);
driver.process(input, "e", "f");
@ -249,7 +253,8 @@ public class KStreamImplTest {
@Test
public void testToWithNullValueSerdeDoesntNPE() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), consumed);
inputStream.to(stringSerde, null, "output");
}
@ -424,7 +429,7 @@ public class KStreamImplTest {
@Test
public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah");
final KTable<String, String> table = builder.table("blah", consumed);
try {
testStream.leftJoin(table,
MockValueJoiner.TOSTRING_JOINER,
@ -437,7 +442,7 @@ public class KStreamImplTest {
@Test
public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah");
final KTable<String, String> table = builder.table("blah", consumed);
try {
testStream.join(table,
MockValueJoiner.TOSTRING_JOINER,

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.JoinWindows;
@ -53,6 +54,7 @@ public class KStreamKStreamJoinTest {
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
@Before
public void setUp() throws IOException {
@ -71,8 +73,8 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
@ -172,14 +174,14 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.outerJoin(stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
joined.process(processor);
Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
@ -275,8 +277,8 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(stream2,
MockValueJoiner.TOSTRING_JOINER,
@ -505,8 +507,8 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(stream2,
MockValueJoiner.TOSTRING_JOINER,
@ -619,8 +621,8 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(stream2,
MockValueJoiner.TOSTRING_JOINER,

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.JoinWindows;
@ -52,6 +53,7 @@ public class KStreamKStreamLeftJoinTest {
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
@Before
@ -71,8 +73,8 @@ public class KStreamKStreamLeftJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.leftJoin(stream2,
MockValueJoiner.TOSTRING_JOINER,
@ -162,8 +164,8 @@ public class KStreamKStreamLeftJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.leftJoin(stream2,
MockValueJoiner.TOSTRING_JOINER,

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KStream;
@ -66,7 +67,7 @@ public class KStreamKTableJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(intSerde, stringSerde, topic1);
stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KStream;
@ -67,7 +68,7 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(intSerde, stringSerde, topic1);
stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
@ -52,7 +53,7 @@ public class KStreamMapTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
MockProcessorSupplier<String, Integer> processor;
processor = new MockProcessorSupplier<>();

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -53,7 +54,7 @@ public class KStreamMapValuesTest {
KStream<Integer, String> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(intSerde, stringSerde, topicName);
stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
stream.mapValues(mapper).process(processor);
driver.setUp(builder);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -44,7 +45,7 @@ public class KStreamPeekTest {
@Test
public void shouldObserveStreamElements() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
@ -63,7 +64,7 @@ public class KStreamPeekTest {
@Test
public void shouldNotAllowNullAction() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
try {
stream.peek(null);
fail("expected null action to throw NPE");

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
@ -61,7 +62,7 @@ public class KStreamSelectKeyTest {
final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
final int[] expectedValues = new int[]{1, 2, 3};
KStream<String, Integer> stream = builder.stream(stringSerde, integerSerde, topicName);
KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
@ -75,7 +76,7 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
stream.transform(transformerSupplier).process(processor);
driver.setUp(builder);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
@ -78,7 +79,7 @@ public class KStreamTransformValuesTest {
KStream<Integer, Integer> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(intSerde, intSerde, topicName);
stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
stream.transformValues(valueTransformerSupplier).process(processor);
driver.setUp(builder);

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@ -59,7 +60,7 @@ public class KStreamWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KStream<String, String> stream1 = builder.stream(topic1, Consumed.with(strSerde, strSerde));
KTable<Windowed<String>, String> table2 =
stream1.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,
@ -151,7 +152,7 @@ public class KStreamWindowAggregateTest {
String topic1 = "topic1";
String topic2 = "topic2";
KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KStream<String, String> stream1 = builder.stream(topic1, Consumed.with(strSerde, strSerde));
KTable<Windowed<String>, String> table1 =
stream1.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,
@ -162,7 +163,7 @@ public class KStreamWindowAggregateTest {
MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
KStream<String, String> stream2 = builder.stream(topic2, Consumed.with(strSerde, strSerde));
KTable<Windowed<String>, String> table2 =
stream2.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -586,7 +587,7 @@ public class SimpleBenchmark {
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
source.process(new ProcessorSupplier<Integer, byte[]>() {
@Override
@ -625,7 +626,7 @@ public class SimpleBenchmark {
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
source.process(new ProcessorSupplier<Integer, byte[]>() {
@ -729,7 +730,7 @@ public class SimpleBenchmark {
} else {
builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
}
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
source.process(new ProcessorSupplier<Integer, byte[]>() {
@Override

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -282,8 +283,9 @@ public class YahooBenchmark {
projectedEventDeserializer.configure(serdeProps, false);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
Consumed.with(Serdes.String(),
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
campaignsTopic, "campaign-state");

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.StateStore;
@ -333,7 +334,7 @@ public class StandbyTaskTest {
restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
builder.stream(null, null, null, null, "topic").groupByKey().count("my-store");
builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count("my-store");
final StreamsConfig config = createConfig(baseDir);
final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.TaskId;
@ -86,6 +87,7 @@ public class StreamThreadTest {
private final String stateDir = TestUtils.tempDirectory().getPath();
private final StateDirectory stateDirectory = new StateDirectory("applicationId", stateDir, mockTime);
private StreamsMetadataState streamsMetadataState;
private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
@Before
public void setUp() throws Exception {
@ -743,8 +745,8 @@ public class StreamThreadTest {
@Test
public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
internalStreamsBuilder.stream(Collections.singleton("t2"), consumed).groupByKey().count("count-two");
final StreamThread thread = createStreamThread(clientId, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@ -1018,7 +1020,7 @@ public class StreamThreadTest {
@Test
public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
final StreamThread thread = createStreamThread(clientId, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@ -1070,7 +1072,7 @@ public class StreamThreadTest {
@Test
public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
final StreamThread thread = createStreamThread(clientId, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -45,7 +46,7 @@ public class ShutdownDeadlockTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), topic);
final KStream<String, String> source = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
source.foreach(new ForeachAction<String, String>() {
@Override

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -106,7 +107,7 @@ public class SmokeTestClient extends SmokeTestUtil {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde));
source.to(stringSerde, intSerde, "echo");
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override