KAFKA-4936: Add dynamic routing in Streams (#5018)

implements KIP-303

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Guozhang Wang 2018-05-30 11:54:53 -07:00 committed by Matthias J. Sax
parent f8dfbb067c
commit f33e9a346e
47 changed files with 551 additions and 286 deletions

View File

@ -3023,7 +3023,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<li>KStream -&gt; void</li>
</ul>
</td>
<td><p class="first"><strong>Terminal operation.</strong> Write the records to a Kafka topic.
<td><p class="first"><strong>Terminal operation.</strong> Write the records to Kafka topic(s).
(<a class="reference external" href="../../../javadoc/org/apache/kafka/streams/kstream/KStream.html#to(java.lang.String)">KStream details</a>)</p>
<p>When to provide serdes explicitly:</p>
<ul class="simple">
@ -3037,6 +3037,8 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<p>A variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to specify how the data is produced by using a <code class="docutils literal"><span class="pre">Produced</span></code>
instance to specify, for example, a <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<p>Another variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <code class="docutils literal"><span class="pre">TopicNameExtractor</span></code>
instance.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>

View File

@ -384,7 +384,8 @@
<li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
<code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
<li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic.</li>
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
</ul>
<p>In this topology, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from

View File

@ -136,6 +136,13 @@
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer.
</p>
<p>
We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that
take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to
based on the record's key and value, as well as record context. Note that all the Kafka topics that that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
<code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application
to use Kafka Streams 2.0.0.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.

View File

@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@ -411,7 +412,8 @@ public class Topology {
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return itself
* @throws TopologyException itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@ -443,7 +445,8 @@ public class Topology {
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@ -471,7 +474,8 @@ public class Topology {
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@ -501,7 +505,8 @@ public class Topology {
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
@ -516,6 +521,130 @@ public class Topology {
return this;
}
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
* The topics that it may ever send to should be pre-created.
* The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
* @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and dynamically write to topics
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public synchronized <K, V> Topology addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final String... parentNames) {
internalTopologyBuilder.addSink(name, topicExtractor, null, null, null, parentNames);
return this;
}
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor},
* using the supplied partitioner.
* The topics that it may ever send to should be pre-created.
* The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link StreamsConfig stream configuration}.
* <p>
* The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
* the named Kafka topic's partitions.
* Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state
* stores} in its processors.
* In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
* records among partitions using Kafka's default partitioning logic.
*
* @param name the unique name of the sink
* @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and dynamically write to topics
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public synchronized <K, V> Topology addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final StreamPartitioner<? super K, ? super V> partitioner,
final String... parentNames) {
internalTopologyBuilder.addSink(name, topicExtractor, null, null, partitioner, parentNames);
return this;
}
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
* The topics that it may ever send to should be pre-created.
* The sink will use the specified key and value serializers.
*
* @param name the unique name of the sink
* @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record
* @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link StreamsConfig stream configuration}
* @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link StreamsConfig stream configuration}
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and dynamically write to topics
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public synchronized <K, V> Topology addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final String... parentNames) {
internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, null, parentNames);
return this;
}
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
* The topics that it may ever send to should be pre-created.
* The sink will use the specified key and value serializers, and the supplied partitioner.
*
* @param name the unique name of the sink
* @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record
* @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link StreamsConfig stream configuration}
* @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link StreamsConfig stream configuration}
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and dynamically write to topics
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
*/
public synchronized <K, V> Topology addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final StreamPartitioner<? super K, ? super V> partitioner,
final String... parentNames) {
internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, partitioner, parentNames);
return this;
}
/**
* Add a new processor node that receives and processes records output by one or more parent source or processor
* node.
@ -526,7 +655,8 @@ public class Topology {
* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
* or if this processor's name is equal to the parent's name
*/
public synchronized Topology addProcessor(final String name,
final ProcessorSupplier supplier,

View File

@ -134,6 +134,7 @@ public interface TopologyDescription {
interface Sink extends Node {
/**
* The topic name this sink node is writing to.
* Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor}
* @return a topic name
*/
String topic();

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@ -461,12 +462,31 @@ public interface KStream<K, V> {
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
*
* @param produced the options to use when producing to the topic
* @param topic the topic name
* @param produced the options to use when producing to the topic
*/
void to(final String topic,
final Produced<K, V> produced);
/**
* Dynamically materialize this stream to topics using default serializers specified in the config and producer's
* {@link DefaultPartitioner}.
* The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}.
*
* @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record
*/
void to(final TopicNameExtractor<K, V> topicExtractor);
/**
* Dynamically materialize this stream to topics using the provided {@link Produced} instance.
* The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}.
*
* @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record
* @param produced the options to use when producing to the topic
*/
void to(final TopicNameExtractor<K, V> topicExtractor,
final Produced<K, V> produced);
/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).

View File

@ -41,6 +41,8 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
@ -304,27 +306,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
to(topic, Produced.<K, V>with(null, null, null));
}
@SuppressWarnings("unchecked")
@Override
public void to(final String topic, final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
to(topic, new ProducedInternal<>(produced));
to(new StaticTopicNameExtractor<K, V>(topic), new ProducedInternal<>(produced));
}
@Override
public void to(final TopicNameExtractor<K, V> topicExtractor) {
to(topicExtractor, Produced.<K, V>with(null, null, null));
}
@Override
public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
to(topicExtractor, new ProducedInternal<>(produced));
}
@SuppressWarnings("unchecked")
private void to(final String topic, final ProducedInternal<K, V> produced) {
private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
final String name = builder.newProcessorName(SINK_NAME);
final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
if (partitioner == null && keySerializer instanceof WindowedSerializer) {
final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, (WindowedSerializer) keySerializer);
builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, windowedPartitioner, this.name);
} else {
builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, partitioner, this.name);
}
}

View File

@ -24,11 +24,9 @@ import static org.apache.kafka.common.utils.Utils.toPositive;
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final String topic;
private final WindowedSerializer<K> serializer;
WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) {
this.topic = topic;
WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
this.serializer = serializer;
}
@ -37,12 +35,14 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*
* @param topic the topic name this record is sent to
* @param windowedKey the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) {
@Override
public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
// hash the keyBytes to choose a partition

View File

@ -199,7 +199,7 @@ public interface ProcessorContext {
long offset();
/**
* Returns the headers of the current input record
* Returns the headers of the current input record; could be null if it is not available
* @return the headers
*/
Headers headers();

View File

@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.Processor;
/**
* The context associated with the current record being processed by
@ -25,32 +24,32 @@ import org.apache.kafka.streams.processor.Processor;
*/
public interface RecordContext {
/**
* @return The offset of the original record received from Kafka
* @return The offset of the original record received from Kafka;
* could be -1 if it is not available
*/
long offset();
/**
* @return The timestamp extracted from the record received from Kafka
* @return The timestamp extracted from the record received from Kafka;
* could be -1 if it is not available
*/
long timestamp();
/**
* Sets a new timestamp for the output record.
*/
void setTimestamp(final long timestamp);
/**
* @return The topic the record was received on
* @return The topic the record was received on;
* could be null if it is not available
*/
String topic();
/**
* @return The partition the record was received on
* @return The partition the record was received on;
* could be -1 if it is not available
*/
int partition();
/**
* @return The headers from the record received from Kafka
* @return The headers from the record received from Kafka;
* could be null if it is not available
*/
Headers headers();

View File

@ -52,11 +52,12 @@ public interface StreamPartitioner<K, V> {
/**
* Determine the partition number for a record with the given key and value and the current number of partitions.
*
*
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
Integer partition(K key, V value, int numPartitions);
Integer partition(String topic, K key, V value, int numPartitions);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* An interface that allows to dynamically determine the name of the Kafka topic to send at the sink node of the topology.
*/
@InterfaceStability.Evolving
public interface TopicNameExtractor<K, V> {
/**
* Extracts the topic name to send to. The topic name must already exist, since the Kafka Streams library will not
* try to automatically create the topic with the extracted name.
*
* @param key the record key
* @param value the record value
* @param recordContext current context metadata of the record
* @return the topic name this record should be sent to
*/
String extract(final K key, final V value, final RecordContext recordContext);
}

View File

@ -42,7 +42,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
private final ThreadCache cache;
private final Serde valueSerde;
private boolean initialized;
protected RecordContext recordContext;
protected ProcessorRecordContext recordContext;
protected ProcessorNode currentNode;
final StateManager stateManager;
@ -178,12 +178,12 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
public void setRecordContext(final RecordContext recordContext) {
public void setRecordContext(final ProcessorRecordContext recordContext) {
this.recordContext = recordContext;
}
@Override
public RecordContext recordContext() {
public ProcessorRecordContext recordContext() {
return recordContext;
}

View File

@ -23,20 +23,18 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
private final Serializer<K> keySerializer;
private final Cluster cluster;
private final String topic;
private final Serializer<K> keySerializer;
private final DefaultPartitioner defaultPartitioner;
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
this.keySerializer = keySerializer;
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
this.cluster = cluster;
this.topic = topic;
this.keySerializer = keySerializer;
this.defaultPartitioner = new DefaultPartitioner();
}
@Override
public Integer partition(final K key, final V value, final int numPartitions) {
public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
final byte[] keyBytes = keySerializer.serialize(topic, key);
return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@ -34,12 +35,12 @@ public interface InternalProcessorContext extends ProcessorContext {
* Returns the current {@link RecordContext}
* @return the current {@link RecordContext}
*/
RecordContext recordContext();
ProcessorRecordContext recordContext();
/**
* @param recordContext the {@link RecordContext} for the record about to be processes
* @param recordContext the {@link ProcessorRecordContext} for the record about to be processes
*/
void setRecordContext(RecordContext recordContext);
void setRecordContext(ProcessorRecordContext recordContext);
/**
* @param currentNode the current {@link ProcessorNode}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@ -319,19 +320,19 @@ public class InternalTopologyBuilder {
}
private class SinkNodeFactory<K, V> extends NodeFactory {
private final String topic;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final StreamPartitioner<? super K, ? super V> partitioner;
private final TopicNameExtractor<K, V> topicExtractor;
private SinkNodeFactory(final String name,
final String[] predecessors,
final String topic,
final TopicNameExtractor<K, V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner) {
super(name, predecessors.clone());
this.topic = topic;
this.topicExtractor = topicExtractor;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
this.partitioner = partitioner;
@ -339,17 +340,22 @@ public class InternalTopologyBuilder {
@Override
public ProcessorNode build() {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
if (topicExtractor instanceof StaticTopicNameExtractor) {
final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName;
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
return new SinkNode<>(name, new StaticTopicNameExtractor<K, V>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
} else {
return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
}
} else {
return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
}
}
@Override
Sink describe() {
return new Sink(name, topic);
return new Sink(name, topicExtractor);
}
}
@ -432,6 +438,18 @@ public class InternalTopologyBuilder {
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic);
}
public final <K, V> void addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner,
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
@ -449,8 +467,7 @@ public class InternalTopologyBuilder {
}
}
nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner));
nodeToSinkTopic.put(name, topic);
nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames);
}
@ -888,13 +905,18 @@ public class InternalTopologyBuilder {
for (final String predecessor : sinkNodeFactory.predecessors) {
processorMap.get(predecessor).addChild(node);
if (internalTopicNames.contains(sinkNodeFactory.topic)) {
// prefix the internal topic name with the application id
final String decoratedTopic = decorateTopic(sinkNodeFactory.topic);
topicSinkMap.put(decoratedTopic, node);
repartitionTopics.add(decoratedTopic);
} else {
topicSinkMap.put(sinkNodeFactory.topic, node);
if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
final String topic = ((StaticTopicNameExtractor) sinkNodeFactory.topicExtractor).topicName;
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
final String decoratedTopic = decorateTopic(topic);
topicSinkMap.put(decoratedTopic, node);
repartitionTopics.add(decoratedTopic);
} else {
topicSinkMap.put(topic, node);
}
}
}
}
@ -1489,17 +1511,26 @@ public class InternalTopologyBuilder {
}
public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
private final String topic;
private final TopicNameExtractor topicNameExtractor;
public Sink(final String name,
final TopicNameExtractor topicNameExtractor) {
super(name);
this.topicNameExtractor = topicNameExtractor;
}
public Sink(final String name,
final String topic) {
super(name);
this.topic = topic;
this.topicNameExtractor = new StaticTopicNameExtractor(topic);
}
@Override
public String topic() {
return topic;
if (topicNameExtractor instanceof StaticTopicNameExtractor)
return ((StaticTopicNameExtractor) topicNameExtractor).topicName;
else
return null;
}
@Override
@ -1509,7 +1540,7 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
return "Sink: " + name + " (topic: " + topic + ")\n <-- " + nodeNames(predecessors);
return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
}
@Override
@ -1523,14 +1554,14 @@ public class InternalTopologyBuilder {
final Sink sink = (Sink) o;
return name.equals(sink.name)
&& topic.equals(sink.topic)
&& topicNameExtractor.equals(sink.topicNameExtractor)
&& predecessors.equals(sink.predecessors);
}
@Override
public int hashCode() {
// omit predecessors as it might change and alter the hash code
return Objects.hash(name, topic);
return Objects.hash(name, topicNameExtractor);
}
}

View File

@ -17,16 +17,17 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.RecordContext;
import java.util.Objects;
public class ProcessorRecordContext implements RecordContext {
private long timestamp;
private final long offset;
private final String topic;
private final int partition;
private final Headers headers;
long timestamp;
final long offset;
final String topic;
final int partition;
final Headers headers;
public ProcessorRecordContext(final long timestamp,
final long offset,
@ -41,18 +42,27 @@ public class ProcessorRecordContext implements RecordContext {
this.headers = headers;
}
public long offset() {
return offset;
}
public long timestamp() {
return timestamp;
public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic) {
this(timestamp, offset, partition, topic, null);
}
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
@Override
public long offset() {
return offset;
}
@Override
public long timestamp() {
return timestamp;
}
@Override
public String topic() {
return topic;

View File

@ -49,11 +49,11 @@ import java.util.Map;
public class RecordCollectorImpl implements RecordCollector {
private final Logger log;
private final String logPrefix;
private final Sensor skippedRecordsSensor;
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private final ProductionExceptionHandler productionExceptionHandler;
private final Sensor skippedRecordsSensor;
private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.";
@ -88,7 +88,7 @@ public class RecordCollectorImpl implements RecordCollector {
if (partitioner != null) {
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
if (partitions.size() > 0) {
partition = partitioner.partition(key, value, partitions.size());
partition = partitioner.partition(topic, key, value, partitions.size());
} else {
throw new StreamsException("Could not get partition information for topic '" + topic + "'." +
" This can happen if the topic does not exist.");

View File

@ -19,26 +19,26 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private final String topic;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
private final TopicNameExtractor<K, V> topicExtractor;
private final StreamPartitioner<? super K, ? super V> partitioner;
private ProcessorContext context;
private InternalProcessorContext context;
public SinkNode(final String name,
final String topic,
final Serializer<K> keySerializer,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner) {
SinkNode(final String name,
final TopicNameExtractor<K, V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner) {
super(name);
this.topic = topic;
this.topicExtractor = topicExtractor;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
this.partitioner = partitioner;
@ -83,6 +83,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
}
final String topic = topicExtractor.extract(key, value, this.context.recordContext());
try {
collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
} catch (final ClassCastException e) {
@ -115,7 +117,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
public String toString(final String indent) {
final StringBuilder sb = new StringBuilder(super.toString(indent));
sb.append(indent).append("\ttopic:\t\t");
sb.append(topic);
sb.append(topicExtractor);
sb.append("\n");
return sb.toString();
}

View File

@ -191,7 +191,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
* @throws UnsupportedOperationException on every invocation
*/
@Override
public RecordContext recordContext() {
public ProcessorRecordContext recordContext() {
throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
}
@ -199,7 +199,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
* @throws UnsupportedOperationException on every invocation
*/
@Override
public void setRecordContext(final RecordContext recordContext) {
public void setRecordContext(final ProcessorRecordContext recordContext) {
throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
/**
* Static topic name extractor
*/
public class StaticTopicNameExtractor<K, V> implements TopicNameExtractor<K, V> {
public final String topicName;
public StaticTopicNameExtractor(final String topicName) {
this.topicName = topicName;
}
public String extract(final K key, final V value, final RecordContext recordContext) {
return topicName;
}
@Override
public String toString() {
return "StaticTopicNameExtractor(" + topicName + ")";
}
}

View File

@ -154,9 +154,7 @@ public class StreamsMetadataState {
return getStreamsMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer,
clusterMetadata,
sourceTopicsInfo.topicWithMostPartitions),
new DefaultStreamPartitioner<>(keySerializer, clusterMetadata),
sourceTopicsInfo);
}
@ -254,7 +252,7 @@ public class StreamsMetadataState {
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo) {
final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
final Set<TopicPartition> matchingPartitions = new HashSet<>();
for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
matchingPartitions.add(new TopicPartition(sourceTopic, partition));

View File

@ -49,7 +49,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements K
abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
return nextFromCache.value.value == null;
return nextFromCache.value.value() == null;
}
@Override

View File

@ -23,8 +23,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
@ -87,7 +87,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
final RecordContext current = context.recordContext();
final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.recordContext());
if (flushListener != null) {
@ -179,7 +179,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
return rawValue;
} else {
return entry.value;
return entry.value();
}
}

View File

@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
@ -168,7 +168,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
final Bytes binaryKey = cacheFunction.key(entry.key());
final RecordContext current = context.recordContext();
final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic);

View File

@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
@ -107,7 +107,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
final Windowed<K> windowedKey,
final InternalProcessorContext context) {
if (flushListener != null) {
final RecordContext current = context.recordContext();
final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
@ -174,7 +174,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
if (entry == null) {
return underlying.fetch(key, timestamp);
} else {
return entry.value;
return entry.value();
}
}

View File

@ -17,21 +17,18 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import java.util.Arrays;
import java.util.Objects;
/**
* A cache entry
*/
class LRUCacheEntry implements RecordContext {
class LRUCacheEntry extends ProcessorRecordContext {
public final byte[] value;
private final Headers headers;
private final long offset;
private final String topic;
private final int partition;
private long timestamp;
private long sizeBytes;
private final byte[] value;
private final long sizeBytes;
private boolean isDirty;
LRUCacheEntry(final byte[] value) {
@ -45,13 +42,9 @@ class LRUCacheEntry implements RecordContext {
final long timestamp,
final int partition,
final String topic) {
super(timestamp, offset, partition, topic, headers);
this.value = value;
this.headers = headers;
this.partition = partition;
this.topic = topic;
this.offset = offset;
this.isDirty = isDirty;
this.timestamp = timestamp;
this.sizeBytes = (value == null ? 0 : value.length) +
1 + // isDirty
8 + // timestamp
@ -60,36 +53,6 @@ class LRUCacheEntry implements RecordContext {
(topic == null ? 0 : topic.length());
}
@Override
public long offset() {
return offset;
}
@Override
public long timestamp() {
return timestamp;
}
@Override
public void setTimestamp(final long timestamp) {
throw new UnsupportedOperationException();
}
@Override
public String topic() {
return topic;
}
@Override
public int partition() {
return partition;
}
@Override
public Headers headers() {
return headers;
}
void markClean() {
isDirty = false;
}
@ -98,9 +61,30 @@ class LRUCacheEntry implements RecordContext {
return isDirty;
}
public long size() {
long size() {
return sizeBytes;
}
byte[] value() {
return value;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final LRUCacheEntry that = (LRUCacheEntry) o;
return timestamp() == that.timestamp() &&
offset() == that.offset() &&
partition() == that.partition() &&
Objects.equals(topic(), that.topic()) &&
Objects.equals(headers(), that.headers()) &&
Arrays.equals(this.value, that.value()) &&
this.isDirty == that.isDirty();
}
@Override
public int hashCode() {
return Objects.hash(timestamp(), offset(), topic(), partition(), headers(), value, isDirty);
}
}

View File

@ -44,7 +44,7 @@ class MergedSortedCacheKeyValueBytesStoreIterator extends AbstractMergedSortedCa
@Override
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
return cacheEntry.value;
return cacheEntry.value();
}
@Override

View File

@ -53,7 +53,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto
@Override
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
return cacheEntry.value;
return cacheEntry.value();
}
@Override

View File

@ -48,7 +48,7 @@ class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStor
@Override
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
return cacheEntry.value;
return cacheEntry.value();
}
@Override

View File

@ -61,7 +61,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
@Override
byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
return cacheEntry.value;
return cacheEntry.value();
}
@Override

View File

@ -121,7 +121,7 @@ class NamedCache {
// evicted already been removed from the cache so add it to the list of
// flushed entries and remove from dirtyKeys.
if (evicted != null) {
entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value, evicted.entry));
entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value(), evicted.entry));
dirtyKeys.remove(evicted.key);
}
@ -130,9 +130,9 @@ class NamedCache {
if (node == null) {
throw new IllegalStateException("Key = " + key + " found in dirty key set, but entry is null");
}
entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry));
entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), node.entry));
node.entry.markClean();
if (node.entry.value == null) {
if (node.entry.value() == null) {
deleted.add(node.key);
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
@ -332,9 +332,9 @@ public class ThreadCache {
static class DirtyEntry {
private final Bytes key;
private final byte[] newValue;
private final RecordContext recordContext;
private final ProcessorRecordContext recordContext;
DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext recordContext) {
DirtyEntry(final Bytes key, final byte[] newValue, final ProcessorRecordContext recordContext) {
this.key = key;
this.newValue = newValue;
this.recordContext = recordContext;
@ -348,7 +348,7 @@ public class ThreadCache {
return newValue;
}
public RecordContext recordContext() {
public ProcessorRecordContext recordContext() {
return recordContext;
}
}

View File

@ -406,7 +406,7 @@ public class KafkaStreamsTest {
public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return 0;
}
});

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@ -95,7 +96,12 @@ public class TopologyTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicWhenAddingSink() {
topology.addSink("name", null);
topology.addSink("name", (String) null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicChooserWhenAddingSink() {
topology.addSink("name", (TopicNameExtractor) null);
}
@Test(expected = NullPointerException.class)

View File

@ -38,10 +38,12 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
@ -50,6 +52,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -220,6 +223,26 @@ public class KStreamImplTest {
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
}
@Test
public void shouldSendDataToDynamicTopics() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, stringConsumed);
stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1),
Produced.with(Serdes.String(), Serdes.String()));
builder.stream(input + "-a-v", stringConsumed).process(processorSupplier);
builder.stream(input + "-b-v", stringConsumed).process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(input, "a", "v1"));
driver.pipeInput(recordFactory.create(input, "a", "v2"));
driver.pipeInput(recordFactory.create(input, "b", "v1"));
}
List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
assertThat(mockProcessors.get(0).processed, equalTo(Utils.mkList("a:v1", "a:v2")));
assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList("b:v1")));
}
@Test
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
final StreamsBuilder builder = new StreamsBuilder();
@ -300,12 +323,12 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnFlatMapValues() {
testStream.flatMapValues((ValueMapper) null);
testStream.flatMapValues((ValueMapper<? super String, ? extends Iterable<? extends String>>) null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
testStream.flatMapValues((ValueMapperWithKey) null);
testStream.flatMapValues((ValueMapperWithKey<? super String, ? super String, ? extends Iterable<? extends String>>) null);
}
@Test(expected = IllegalArgumentException.class)
@ -325,7 +348,12 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicOnTo() {
testStream.to(null);
testStream.to((String) null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicChooserOnTo() {
testStream.to((TopicNameExtractor<String, String>) null);
}
@Test(expected = NullPointerException.class)

View File

@ -57,7 +57,7 @@ public class WindowedStreamPartitionerTest {
final Random rand = new Random();
final DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, timeWindowedSerializer);
final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer);
for (int k = 0; k < 10; k++) {
Integer key = rand.nextInt();
@ -72,7 +72,7 @@ public class WindowedStreamPartitionerTest {
TimeWindow window = new TimeWindow(10 * w, 20 * w);
Windowed<Integer> windowedKey = new Windowed<>(key, window);
Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
Integer actual = streamPartitioner.partition(topicName, windowedKey, value, infos.size());
assertEquals(expected, actual);
}

View File

@ -49,7 +49,7 @@ public class AbstractProcessorContextTest {
private final AbstractProcessorContext context = new TestProcessorContext(metrics);
private final MockStateStore stateStore = new MockStateStore("store", false);
private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers);
private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
@Before
public void before() {
@ -95,7 +95,7 @@ public class AbstractProcessorContextTest {
@Test
public void shouldReturnNullIfTopicEqualsNonExistTopic() {
context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
assertThat(context.topic(), nullValue());
}
@ -153,7 +153,7 @@ public class AbstractProcessorContextTest {
@Test
public void shouldReturnNullIfHeadersAreNotSet() {
context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
assertThat(context.headers(), nullValue());
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@ -411,7 +412,12 @@ public class InternalTopologyBuilderTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicWhenAddingSink() {
builder.addSink("name", null, null, null, null);
builder.addSink("name", (String) null, null, null, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicChooserWhenAddingSink() {
builder.addSink("name", (TopicNameExtractor) null, null, null, null);
}
@Test(expected = NullPointerException.class)
@ -456,7 +462,7 @@ public class InternalTopologyBuilderTest {
@Test(expected = NullPointerException.class)
public void shouldNotAddNullStateStoreSupplier() {
builder.addStateStore((StoreBuilder) null);
builder.addStateStore(null);
}
private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {

View File

@ -420,7 +420,7 @@ public class ProcessorTopologyTest {
private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
return new StreamPartitioner<Object, Object>() {
@Override
public Integer partition(final Object key, final Object value, final int numPartitions) {
public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) {
return partition;
}
};

View File

@ -73,7 +73,7 @@ public class RecordCollectorTest {
private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
};

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
public class RecordContextStub implements RecordContext {
private final long offset;
private long timestamp;
private final int partition;
private final String topic;
private final Headers headers;
public RecordContextStub() {
this(-1, -1, -1, "", null);
}
public RecordContextStub(final long offset,
final long timestamp,
final int partition,
final String topic,
final Headers headers) {
this.offset = offset;
this.timestamp = timestamp;
this.partition = partition;
this.topic = topic;
this.headers = headers;
}
public RecordContextStub(final long offset,
final long timestamp,
final int partition,
final String topic) {
this(offset, timestamp, partition, topic, null);
}
@Override
public long offset() {
return offset;
}
@Override
public long timestamp() {
return timestamp;
}
@Override
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
@Override
public String topic() {
return topic;
}
@Override
public int partition() {
return partition;
}
@Override
public Headers headers() {
return headers;
}
}

View File

@ -47,7 +47,7 @@ public class SinkNodeTest {
new Metrics().sensor("skipped-records")
)
);
private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
private final SinkNode sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor("any-output-topic"), anySerializer, anySerializer, null);
@Before
public void before() {

View File

@ -65,7 +65,6 @@ public class StreamsMetadataStateTest {
private TopicPartition topic1P1;
private TopicPartition topic2P1;
private TopicPartition topic4P0;
private List<PartitionInfo> partitionInfos;
private Cluster cluster;
private final String globalTable = "global-table";
private StreamPartitioner<String, Object> partitioner;
@ -113,7 +112,7 @@ public class StreamsMetadataStateTest {
hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
partitionInfos = Arrays.asList(
final List<PartitionInfo> partitionInfos = Arrays.asList(
new PartitionInfo("topic-one", 0, null, null, null),
new PartitionInfo("topic-one", 1, null, null, null),
new PartitionInfo("topic-two", 0, null, null, null),
@ -126,7 +125,7 @@ public class StreamsMetadataStateTest {
discovery.onChange(hostToPartitions, cluster);
partitioner = new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return 1;
}
};
@ -246,7 +245,7 @@ public class StreamsMetadataStateTest {
final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return 2;
}
});

View File

@ -72,8 +72,8 @@ public class NamedCacheTest {
cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
LRUCacheEntry head = cache.first();
LRUCacheEntry tail = cache.last();
assertEquals(new String(head.value), toInsert.get(i).value);
assertEquals(new String(tail.value), toInsert.get(0).value);
assertEquals(new String(head.value()), toInsert.get(i).value);
assertEquals(new String(tail.value()), toInsert.get(0).value);
assertEquals(cache.flushes(), 0);
assertEquals(cache.hits(), 0);
assertEquals(cache.misses(), 0);
@ -116,9 +116,9 @@ public class NamedCacheTest {
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{11}));
cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{12}));
assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value);
assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value);
assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value);
assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value());
assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value());
assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value());
assertEquals(cache.hits(), 3);
}
@ -128,15 +128,15 @@ public class NamedCacheTest {
cache.putIfAbsent(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{20}));
cache.putIfAbsent(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{30}));
assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value);
assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value);
assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value());
assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value());
}
@Test
public void shouldDeleteAndUpdateSize() {
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}));
final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
assertArrayEquals(new byte[] {10}, deleted.value);
assertArrayEquals(new byte[] {10}, deleted.value());
assertEquals(0, cache.sizeInBytes());
}
@ -146,9 +146,9 @@ public class NamedCacheTest {
KeyValue.pair(new byte[] {1}, new LRUCacheEntry(new byte[]{1})),
KeyValue.pair(new byte[] {2}, new LRUCacheEntry(new byte[]{2}))));
assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value);
assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value);
assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value);
assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value());
assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value());
assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value());
}
@Test
@ -157,7 +157,7 @@ public class NamedCacheTest {
KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{1})),
KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{2}))));
assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value);
assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value());
assertEquals(cache.overwrites(), 2);
}

View File

@ -66,7 +66,7 @@ public class ThreadCacheTest {
Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
LRUCacheEntry entry = cache.get(namespace, key);
assertEquals(entry.isDirty(), true);
assertEquals(new String(entry.value), kvToInsert.value);
assertEquals(new String(entry.value()), kvToInsert.value);
}
assertEquals(cache.gets(), 5);
assertEquals(cache.puts(), 5);
@ -188,7 +188,7 @@ public class ThreadCacheTest {
final Bytes key = Bytes.wrap(new byte[]{0});
cache.put(namespace, key, dirtyEntry(key.get()));
assertEquals(key.get(), cache.delete(namespace, key).value);
assertEquals(key.get(), cache.delete(namespace, key).value());
assertNull(cache.get(namespace, key));
}
@ -204,7 +204,7 @@ public class ThreadCacheTest {
}
});
cache.put(namespace, key, dirtyEntry(key.get()));
assertEquals(key.get(), cache.delete(namespace, key).value);
assertEquals(key.get(), cache.delete(namespace, key).value());
// flushing should have no further effect
cache.flush(namespace);
@ -235,8 +235,8 @@ public class ThreadCacheTest {
cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get()));
assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value);
assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value);
assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value());
assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value());
}
@Test
@ -413,8 +413,8 @@ public class ThreadCacheTest {
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value);
assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value);
assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value());
assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());
}
@Test
@ -436,8 +436,8 @@ public class ThreadCacheTest {
final Bytes key = Bytes.wrap(new byte[]{10});
final byte[] value = {30};
assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value);
assertArrayEquals(value, cache.get(namespace, key).value);
assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value());
assertArrayEquals(value, cache.get(namespace, key).value());
}
@Test

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier}
import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@ -249,6 +249,38 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def to(topic: String)(implicit produced: Produced[K, V]): Unit =
inner.to(topic, produced)
/**
* Dynamically materialize this stream to topics using the `Produced` instance for
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
* The topic names for each record to send to is dynamically determined based on the given mapper.
* <p>
* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
* key and value serdes that will be converted to a `Produced` instance implicitly.
* <p>
* {{{
* Example:
*
* // brings implicit serdes in scope
* import Serdes._
*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
*
* // Implicit serdes in scope will generate an implicit Produced instance, which
* // will be passed automatically to the call of through below
* clicksPerRegion.to(topicChooser)
*
* // Similarly you can create an implicit Produced and it will be passed implicitly
* // to the through call
* }}}
*
* @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @see `org.apache.kafka.streams.kstream.KStream#to`
*/
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit =
inner.to(extractor, produced)
/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).