Commit Graph

2334 Commits

Author SHA1 Message Date
Guozhang Wang d05fa0a03b KAFKA-2804: manage changelog topics through ZK in PartitionAssignor
Author: Guozhang Wang <wangguoz@gmail.com>
Author: wangguoz@gmail.com <guozhang@Guozhang-Macbook.local>
Author: Guozhang Wang <guozhang@Guozhang-Macbook.local>

Reviewers: Yasuhiro Matsuda

Closes #579 from guozhangwang/K2804
2015-12-07 15:12:09 -08:00
Yasuhiro Matsuda 39c3512ece KAFKA-2856: Add KTable non-stateful APIs along with standby task support
guozhangwang
* added KTable API and impl
* added standby support for KTable

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #604 from ymatsuda/add_ktable
2015-12-04 14:59:24 -08:00
bbejeck a5382a333e KAFKA-2902: streaming config use get base consumer configs.
Changes made for using getBaseConsumerConfigs from StreamingConfig.getConsumerConfigs.

Author: bbejeck <bbejeck@gmail.com>
Author: Bill Bejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang

Closes #596 from bbejeck/KAFKA-2902-StreamingConfig-use-getBaseConsumerConfigs
2015-12-02 18:40:07 -08:00
Yasuhiro Matsuda d07bb18140 MINOR: comments on KStream methods, and fix generics
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #591 from ymatsuda/comments
2015-11-25 16:44:43 -08:00
Yasuhiro Matsuda 5e8958a856 MINOR: initialize Serdes with ProcessorContext
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #589 from ymatsuda/init_serdes_with_procctx
2015-11-25 15:21:17 -08:00
Yasuhiro Matsuda e14ae66e72 MINOR: change KStream processor names
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #587 from ymatsuda/kstream_processor_names
2015-11-25 14:53:24 -08:00
Yasuhiro Matsuda 617a91a236 HOTFIX: fix StreamTask.close()
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #586 from ymatsuda/fix_streamtask_close
2015-11-25 12:04:24 -08:00
Bill Bejeck 84c8d2bb86 KAFKA-2872: unite sink nodes with parent nodes in addSink
Starting a KafkaStream was getting an error due to the fact that the TopologyBuilder.addSink method was not connecting the sink with it parent(s) processor/sources.  Just needed to wire up the sink with it parent(s) in TopologyBuilder.addSink .

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang

Closes #572 from bbejeck/KAFKA-2872_kafka_stream_sink_not_connected_to_parent
2015-11-21 18:46:15 -08:00
Yasuhiro Matsuda 0158623480 MINOR: remove the group id from a restore consumer
guozhangwang
A restore consumer does not belong to a consumer group.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #543 from ymatsuda/no_group_for_restore_consumer
2015-11-17 17:39:21 -08:00
Yasuhiro Matsuda 1a36af80b7 MINOR: add KStream merge operator
guozhangwang

Added KStreamBuilder.merge(KStream...).

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #536 from ymatsuda/kstream_merge_operator
2015-11-17 17:34:54 -08:00
Yasuhiro Matsuda 4a3d244a2c MINOR: do not create a StandbyTask if there is no state store in the task
guozhangwang
An optimization which may reduce unnecessary poll for standby tasks.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #535 from ymatsuda/remove_empty_standby_task
2015-11-16 14:09:27 -08:00
Yasuhiro Matsuda 45e7f71309 KAFKA-2811: add standby tasks
guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
  * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
  * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
  * In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
  * In removeStandByTasks(), change log partitions are removed from restoreConsumer.
  * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
  * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #526 from ymatsuda/standby_task
2015-11-16 13:34:42 -08:00
Grant Henke 370ce2b4b7 KAFKA-2815: Fix KafkaStreamingPartitionAssignorTest.testSubscription
Fails when order of elements is incorrect

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #510 from granthenke/streams-test
2015-11-12 09:59:47 -08:00
Yasuhiro Matsuda 124f73b174 KAFKA-2763: better stream task assignment
guozhangwang

When the rebalance happens each consumer reports the following information to the coordinator.
* Client UUID (a unique id assigned to an instance of KafkaStreaming)
* Task ids of previously running tasks
* Task ids of valid local states on the client's state directory

TaskAssignor does the following
* Assign a task to a client which was running it previously. If there is no such client, assign a task to a client which has its valid local state.
* Try to balance the load among stream threads.
  * A client may have more than one stream threads. The assignor tries to assign tasks to a client proportionally to the number of threads.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #497 from ymatsuda/task_assignment
2015-11-11 16:14:27 -08:00
Jason Gustafson c39e79bb5a KAFKA-2691: Improve handling of authorization failure during metadata refresh
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao

Closes #394 from hachikuji/KAFKA-2691
2015-11-04 11:02:30 -08:00
Yasuhiro Matsuda 421de0a3f9 KAFKA-2727: Topology partial construction
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #411 from ymatsuda/topology_partial_construction
2015-11-04 09:57:50 -08:00
Yasuhiro Matsuda e466ccd711 KAFKA-2707: make KStream processor names deterministic
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #408 from ymatsuda/kstream_processor_name
2015-11-02 14:40:52 -08:00
Yasuhiro Matsuda 758272267c KAFKA-2706: make state stores first class citizens in the processor topology
* Added StateStoreSupplier
* StateStore
  * Added init(ProcessorContext context) method
* TopologyBuilder
  * Added addStateStore(StateStoreSupplier supplier, String... processNames)
  * Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
    * This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
  * add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #387 from ymatsuda/state_store_supplier
2015-11-02 13:24:48 -08:00
Yasuhiro Matsuda 13c3e049fb HOTFIX: correct sourceNodes for kstream.through()
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #374 from ymatsuda/fix_through_operator
2015-10-27 16:26:47 -07:00
Yasuhiro Matsuda af42c37899 HOTFIX: call consumer.poll() even when no task is assigned
StreamThread should keep calling consumer.poll() even when no task is assigned. This is necessary to get a task.

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #373 from ymatsuda/no_task
2015-10-27 13:57:19 -07:00
Yasuhiro Matsuda 38a1b60553 HOTFIX: fix off-by-one stream offset commit
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #372 from ymatsuda/commit_offset
2015-10-27 13:49:19 -07:00
Yasuhiro Matsuda e6f9b9e473 KAFKA-2694: Reformat task id as group id and partition id
guozhangwang

* A task id is now a class, ```TaskId```, that has a topic group id and a partition id fields.
* ```TopologyBuilder``` assigns a topic group id to a topic group. Related methods are changed accordingly.
* A state store uses the partition id part of the task id as the change log partition id.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #365 from ymatsuda/task_id
2015-10-26 20:14:19 -07:00
Yasuhiro Matsuda 71399ffe4c KAFKA-2652: integrate new group protocol into partition grouping
guozhangwang

* added ```PartitionGrouper``` (abstract class)
 * This class is responsible for grouping partitions. Each group forms a task.
 * Users may implement this class for custom grouping.
* added ```DefaultPartitionGrouper```
 * our default implementation of ```PartitionGrouper```
* added ```KafkaStreamingPartitionAssignor```
 * We always use this as ```PartitionAssignor``` of stream consumers.
 * Actual grouping is delegated to ```PartitionGrouper```.
* ```TopologyBuilder```
 * added ```topicGroups()```
   * This returns groups of related topics according to the topology
 * added ```copartitionSources(sourceNodes...)```
   * This is used by DSL layer. It asserts the specified source nodes must be copartitioned.
 * added ```copartitionGroups()```
   * This returns groups of copartitioned topics
* KStream layer
 * keep track of source nodes to determine copartition sources when steams are joined
 * source nodes are set to null when partitioning property is not preserved (ex. ```map()```, ```transform()```), and this indicates the stream is no longer joinable

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #353 from ymatsuda/grouping
2015-10-26 13:33:51 -07:00
Randall Hauch c553249b4e KAFKA-2594: Add InMemoryLRUCacheStore as a preliminary method for bounding in-memory stores
Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.

This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.

Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Edward Ribeiro, Guozhang Wang

Closes #256 from rhauch/kafka-2594
2015-10-16 14:46:26 -07:00
Yasuhiro Matsuda 50a076d1e9 MINOR: set up temp directories properly in StreamTaskTest
guozhangwang
StreamTaskTest did not set up a temp directory for each test. This occasionally caused interference between tests through state directory locking.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #317 from ymatsuda/fix_StreamTaskTest
2015-10-15 11:49:44 -07:00
Yasuhiro Matsuda c50d39ea82 KAFKA-2654: optimize unnecessary poll(0) away in StreamTask
guozhangwang
This change aims to remove unnecessary ```consumer.poll(0)``` calls.
* ```once``` after some partition is resumed
* whenever the size of the top queue in any task is below ```BUFFERED_RECORDS_PER_PARTITION_CONFIG```

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #315 from ymatsuda/less_poll_zero
2015-10-15 11:06:51 -07:00
Randall Hauch 6e571225d5 KAFKA-2593: Key value stores can use specified serializers and deserializers
Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext.

Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Guozhang Wang

Closes #255 from rhauch/kafka-2593
2015-10-14 13:59:10 -07:00
Yasuhiro Matsuda b1ce9494e3 MINOR: flush record collector after local state flush
guozhangwang
Fix the order of flushing. Undoing the change I did sometime ago.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #304 from ymatsuda/flush_order
2015-10-13 15:31:27 -07:00
Yasuhiro Matsuda c67ca65889 MINOR: putting back kstream stateful transform methods
guozhangwang

* added back type safe stateful transform methods (kstream.transform() and kstream.transformValues())
* changed kstream.process() to void

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #292 from ymatsuda/transform_method
2015-10-09 16:28:40 -07:00
Randall Hauch 7233858bea KAFKA-2600: Align Kafka Streams' interfaces with Java 8 functional interfaces
A few of Kafka Stream's interfaces and classes are not as well-aligned with Java 8's functional interfaces. By making these changes, when Kafka moves to Java 8 these classes can extend standard Java 8 functional interfaces while remaining backward compatible. This will make it easier for developers to use Kafka Streams, and may allow us to eventually remove these custom interfaces and just use the standard Java 8 interfaces.

The changes include:

1. The 'apply' method of KStream's `Predicate` functional interface was renamed to `test` to match the method name on `java.util.function.BiPredicate`. This will allow KStream's `Predicate` to extend `BiPredicate` when Kafka moves to Java 8, and for the `KStream.filter` and `filterOut` methods to accept `BiPredicate`.
2. Renamed the `ProcessorDef` and `WindowDef` interfaces to `ProcessorSupplier` and `WindowSupplier`, respectively. Also the `SlidingWindowDef` class was renamed to `SlidingWindowSupplier`, and the `MockProcessorDef` test class was renamed to `MockProcessorSupplier`. The `instance()` method in all were renamed to `get()`, so that all of these can extend/implement Java 8's `java.util.function.Supplier<T>` interface in the future with no other changes and while remaining backward compatible. Variable names that used some form of "def" were changed to use "supplier".

These two sets of changes were made in separate commits.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #270 from rhauch/kafka-2600
2015-10-09 12:19:07 -07:00
Yasuhiro Matsuda 5a921a3239 MINOR: typing ProcessorDef
guozhangwang
This code change properly types ProcessorDef. This also makes KStream.process() typesafe.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #289 from ymatsuda/typing_ProcessorDef
2015-10-08 23:19:12 -07:00
Guozhang Wang f236bc20ff HOTFIX: Persistent store in ProcessorStateManagerTest
ymatsuda junrao Could you take a quick look? The current unit test is failing on this.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma, Jun Rao

Closes #276 from guozhangwang/HF-ProcessorStateManager
2015-10-05 13:59:04 -07:00
Guozhang Wang 37f7d75e3d KAFKA-2591: Fix StreamingMetrics
Remove state storage upon unclean shutdown and fix streaming metrics used for local state.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Edward Ribeiro, Yasuhiro Matsuda, Jun Rao

Closes #265 from guozhangwang/K2591
2015-10-02 20:12:34 -07:00
Guozhang Wang 263c10ab7c KIP-28: Add a processor client for Kafka Streaming
This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.

Author: Guozhang Wang <wangguoz@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro.matsuda@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Author: ymatsuda <yasuhiro.matsuda@gmail.com>
Author: Randall Hauch <rhauch@gmail.com>
Author: Jesse Anderson <jesse@smokinghand.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Jesse Anderson <eljefe6a@gmail.com>

Reviewers: Ismael Juma, Randall Hauch, Edward Ribeiro, Gwen Shapira, Jun Rao, Jay Kreps, Yasuhiro Matsuda, Guozhang Wang

Closes #130 from guozhangwang/streaming
2015-09-25 17:27:58 -07:00