Commit Graph

2811 Commits

Author SHA1 Message Date
Randall Hauch c553249b4e KAFKA-2594: Add InMemoryLRUCacheStore as a preliminary method for bounding in-memory stores
Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.

This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.

Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Edward Ribeiro, Guozhang Wang

Closes #256 from rhauch/kafka-2594
2015-10-16 14:46:26 -07:00
Yasuhiro Matsuda 50a076d1e9 MINOR: set up temp directories properly in StreamTaskTest
guozhangwang
StreamTaskTest did not set up a temp directory for each test. This occasionally caused interference between tests through state directory locking.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #317 from ymatsuda/fix_StreamTaskTest
2015-10-15 11:49:44 -07:00
Yasuhiro Matsuda c50d39ea82 KAFKA-2654: optimize unnecessary poll(0) away in StreamTask
guozhangwang
This change aims to remove unnecessary ```consumer.poll(0)``` calls.
* ```once``` after some partition is resumed
* whenever the size of the top queue in any task is below ```BUFFERED_RECORDS_PER_PARTITION_CONFIG```

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #315 from ymatsuda/less_poll_zero
2015-10-15 11:06:51 -07:00
Randall Hauch 6e571225d5 KAFKA-2593: Key value stores can use specified serializers and deserializers
Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext.

Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Guozhang Wang

Closes #255 from rhauch/kafka-2593
2015-10-14 13:59:10 -07:00
Yasuhiro Matsuda b1ce9494e3 MINOR: flush record collector after local state flush
guozhangwang
Fix the order of flushing. Undoing the change I did sometime ago.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #304 from ymatsuda/flush_order
2015-10-13 15:31:27 -07:00
Yasuhiro Matsuda c67ca65889 MINOR: putting back kstream stateful transform methods
guozhangwang

* added back type safe stateful transform methods (kstream.transform() and kstream.transformValues())
* changed kstream.process() to void

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #292 from ymatsuda/transform_method
2015-10-09 16:28:40 -07:00
Randall Hauch 7233858bea KAFKA-2600: Align Kafka Streams' interfaces with Java 8 functional interfaces
A few of Kafka Stream's interfaces and classes are not as well-aligned with Java 8's functional interfaces. By making these changes, when Kafka moves to Java 8 these classes can extend standard Java 8 functional interfaces while remaining backward compatible. This will make it easier for developers to use Kafka Streams, and may allow us to eventually remove these custom interfaces and just use the standard Java 8 interfaces.

The changes include:

1. The 'apply' method of KStream's `Predicate` functional interface was renamed to `test` to match the method name on `java.util.function.BiPredicate`. This will allow KStream's `Predicate` to extend `BiPredicate` when Kafka moves to Java 8, and for the `KStream.filter` and `filterOut` methods to accept `BiPredicate`.
2. Renamed the `ProcessorDef` and `WindowDef` interfaces to `ProcessorSupplier` and `WindowSupplier`, respectively. Also the `SlidingWindowDef` class was renamed to `SlidingWindowSupplier`, and the `MockProcessorDef` test class was renamed to `MockProcessorSupplier`. The `instance()` method in all were renamed to `get()`, so that all of these can extend/implement Java 8's `java.util.function.Supplier<T>` interface in the future with no other changes and while remaining backward compatible. Variable names that used some form of "def" were changed to use "supplier".

These two sets of changes were made in separate commits.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #270 from rhauch/kafka-2600
2015-10-09 12:19:07 -07:00
Yasuhiro Matsuda 5a921a3239 MINOR: typing ProcessorDef
guozhangwang
This code change properly types ProcessorDef. This also makes KStream.process() typesafe.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #289 from ymatsuda/typing_ProcessorDef
2015-10-08 23:19:12 -07:00
Guozhang Wang f236bc20ff HOTFIX: Persistent store in ProcessorStateManagerTest
ymatsuda junrao Could you take a quick look? The current unit test is failing on this.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma, Jun Rao

Closes #276 from guozhangwang/HF-ProcessorStateManager
2015-10-05 13:59:04 -07:00
Guozhang Wang 37f7d75e3d KAFKA-2591: Fix StreamingMetrics
Remove state storage upon unclean shutdown and fix streaming metrics used for local state.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Edward Ribeiro, Yasuhiro Matsuda, Jun Rao

Closes #265 from guozhangwang/K2591
2015-10-02 20:12:34 -07:00
Guozhang Wang 263c10ab7c KIP-28: Add a processor client for Kafka Streaming
This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.

Author: Guozhang Wang <wangguoz@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro.matsuda@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Author: ymatsuda <yasuhiro.matsuda@gmail.com>
Author: Randall Hauch <rhauch@gmail.com>
Author: Jesse Anderson <jesse@smokinghand.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Jesse Anderson <eljefe6a@gmail.com>

Reviewers: Ismael Juma, Randall Hauch, Edward Ribeiro, Gwen Shapira, Jun Rao, Jay Kreps, Yasuhiro Matsuda, Guozhang Wang

Closes #130 from guozhangwang/streaming
2015-09-25 17:27:58 -07:00