diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 724daac650b..15d6d8b57fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -55,26 +55,26 @@ import java.util.concurrent.atomic.AtomicInteger; * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. *

+ * * A simple example might look like this: *

  *    Map<String, Object> props = new HashMap<>();
- *    props.put("bootstrap.servers", "localhost:4242");
- *    props.put("key.deserializer", StringDeserializer.class);
- *    props.put("value.deserializer", StringDeserializer.class);
- *    props.put("key.serializer", StringSerializer.class);
- *    props.put("value.serializer", IntegerSerializer.class);
- *    props.put("timestamp.extractor", MyTimestampExtractor.class);
+ *    props.put(StreamsConfig.JOB_ID_CONFIG, "my-job");
+ *    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ *    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ *    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ *    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ *    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  *    StreamsConfig config = new StreamsConfig(props);
  *
  *    KStreamBuilder builder = new KStreamBuilder();
- *    builder.from("topic1").mapValue(value -> value.length()).to("topic2");
+ *    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");
  *
  *    KafkaStreams streams = new KafkaStreams(builder, config);
  *    streams.start();
  * 
* */ -// TODO: about example may need to be updated after KAFKA-3153 @InterfaceStability.Unstable public class KafkaStreams { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 65ec96965ad..c4b8ffe5d7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -35,6 +35,10 @@ import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +/** + * Configuration for Kafka Streams. Documentation for these configurations can be found in the Kafka documentation + */ public class StreamsConfig extends AbstractConfig { private static final ConfigDef CONFIG; diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index d392eef2ef1..70c332092ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams; import org.apache.kafka.common.metrics.Sensor; /** - * The stream metrics interface for adding metric sensors and collecting metric values. + * The Kafka Streams metrics interface for adding metric sensors and collecting metric values. */ public interface StreamsMetrics { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 231eb22113c..6426af9d915 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -22,13 +22,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; - -// TODO: Javadoc needs to be updated /** - * KStream is an abstraction of a stream of key-value pairs. + * KStream is an abstraction of an event stream in key-value pairs. * - * @param the type of keys - * @param the type of values + * @param Type of keys + * @param Type of values */ public interface KStream { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index a2c639787d8..485bb20624d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -21,13 +21,11 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; -// TODO: Javadoc needs to be updated. /** - * KTable is an abstraction of a change log stream. + * KTable is an abstraction of a change log stream from a primary-keyed table. * - * - * @param the type of keys - * @param the type of values + * @param Type of primary keys + * @param Type of value changes */ public interface KTable { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index b67f619b9ac..47198e4991d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -19,6 +19,13 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; +/** + * A stateful Transformer interface for transform a key-value pair into a new value. + * + * @param Key type. + * @param Value type. + * @param Return type. + */ public interface Transformer { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java index 93d930df93b..fc7ba60d08e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream; +/** + * A transformer supplier which can create one or more {@link Transformer} instances. + */ public interface TransformerSupplier { Transformer get(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 06882b3709c..7cadfb4ff1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import java.util.HashMap; import java.util.Map; +/** + * The unlimited window specifications. + */ public class UnlimitedWindows extends Windows { private static final long DEFAULT_START_TIMESTAMP = 0L; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 5b9e2ff2d1e..b4d2b38213c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -19,6 +19,12 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; +/** + * A stateful Value Transformer interface for transform a value into a new value. + * + * @param Value type. + * @param Return type. + */ public interface ValueTransformer { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index 04fa9eb0a96..6bc86bc1744 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream; +/** + * A value transformer supplier which can create one or more {@link ValueTransformer} instances. + */ public interface ValueTransformerSupplier { ValueTransformer get(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 845f9e91caa..22d52aa5e26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -17,6 +17,15 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +/** + * The windowed key interface used in {@link KTable}, used for representing a windowed table result from windowed stream aggregations, + * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, Serializer, Serializer, Deserializer, Deserializer)} + * + * @param Type of the key + */ public class Windowed { private T value; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index dad5c6f732b..06681ac51e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.Set; /** - * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream. + * Default implementation of the {@link PartitionGrouper} interface that groups partitions by the partition id. * * Join operations requires that topics of the joining entities are copartitoned, i.e., being partitioned by the same key and having the same * number of partitions. Copartitioning is ensured by having the same number of partitions on diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index f8311e7068b..ae9844de41a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -23,6 +23,13 @@ import org.apache.kafka.common.TopicPartition; import java.util.Map; import java.util.Set; +/** + * A partition grouper that generates partition groups given the list of topic-partitions. + * + * This grouper also acts as the stream task creation function along with partition distribution + * such that each generated partition group is assigned with a distinct {@link TaskId}; + * the created task ids will then be assigned to Kafka Streams instances that host the stream job. + */ public interface PartitionGrouper { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java index 719d3ac2a34..65618999b1e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java @@ -17,6 +17,16 @@ package org.apache.kafka.streams.processor; +/** + * A processor supplier that can create one or more {@link Processor} instances. + * + * It is used in {@link TopologyBuilder} for adding new processor operators, whose generated + * topology can then be replicated (and thus creating one or more {@link Processor} instances) + * and distributed to multiple stream threads. + * + * @param the type of keys + * @param the type of values + */ public interface ProcessorSupplier { Processor get(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index ce0ba700907..224d5800936 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -20,7 +20,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord} + * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord}. + * The extracted timestamp is defined as milliseconds. */ public interface TimestampExtractor { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index 55ec8cf487c..cdb3de5f90a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -33,5 +33,5 @@ import java.util.Iterator; public interface KeyValueIterator extends Iterator>, Closeable { @Override - public void close(); + void close(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index f296230aa10..3e7f6fbc8c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -39,7 +39,7 @@ public interface KeyValueStore extends StateStore { * @return The value or null if no value is found. * @throws NullPointerException If null is used for key. */ - abstract public V get(K key); + V get(K key); /** * Update the value associated with this key @@ -48,7 +48,7 @@ public interface KeyValueStore extends StateStore { * @param value The value * @throws NullPointerException If null is used for key or value. */ - abstract public void put(K key, V value); + void put(K key, V value); /** * Update the value associated with this key, unless a value @@ -59,7 +59,7 @@ public interface KeyValueStore extends StateStore { * @return The old value or null if there is no such key. * @throws NullPointerException If null is used for key or value. */ - abstract public V putIfAbsent(K key, V value); + V putIfAbsent(K key, V value); /** * Update all the given key/value pairs @@ -67,7 +67,7 @@ public interface KeyValueStore extends StateStore { * @param entries A list of entries to put into the store. * @throws NullPointerException If null is used for any key or value. */ - abstract public void putAll(List> entries); + void putAll(List> entries); /** * Delete the value from the store (if there is one) @@ -76,7 +76,7 @@ public interface KeyValueStore extends StateStore { * @return The old value or null if there is no such key. * @throws NullPointerException If null is used for key. */ - abstract public V delete(K key); + V delete(K key); /** * Get an iterator over a given range of keys. This iterator MUST be closed after use. @@ -86,13 +86,13 @@ public interface KeyValueStore extends StateStore { * @return The iterator for this range. * @throws NullPointerException If null is used for from or to. */ - abstract public KeyValueIterator range(K from, K to); + KeyValueIterator range(K from, K to); /** * Return an iterator over all keys in the database. This iterator MUST be closed after use. * * @return An iterator of all key/value pairs in the store. */ - abstract public KeyValueIterator all(); + KeyValueIterator all(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java index e1e78afe068..e92531210b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java @@ -27,6 +27,12 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +/** + * Factory for creating serializers / deserializers for state stores in Kafka Streams. + * + * @param key type of serdes + * @param value type of serdes + */ public final class Serdes { public static Serdes withBuiltinTypes(String topic, Class keyClass, Class valueClass) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index e9d82bcf381..e803832ba8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; /** - * Factory for creating key-value stores. + * Factory for creating state stores in Kafka Streams. */ public class Stores { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 08cd049b02e..7c474dd60bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -23,6 +23,11 @@ import org.apache.kafka.streams.KeyValue; import java.util.Iterator; +/** + * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. + * + * @param Type of values + */ public interface WindowStoreIterator extends Iterator> { void close(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java index 3a3d5857619..c6bbb232c5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java @@ -21,7 +21,7 @@ package org.apache.kafka.streams.state; import java.nio.ByteBuffer; -public class WindowStoreUtils { +public class WindowStoreUtils { public static final int TIMESTAMP_SIZE = 8; public static final int SEQNUM_SIZE = 4;