Commit Graph

3240 Commits

Author SHA1 Message Date
Nick Telford bef83ce89b
KAFKA-15541: Add iterator-duration metrics (#16028)
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

This new `StateStore` metric tracks the average and maximum amount of
time between creating and closing Iterators.

Iterators with very high durations can indicate to users performance
problems that should be addressed.

If a store reports no data for these metrics, despite the user opening
Iterators on the store, it suggests those iterators are not being
closed, and have therefore leaked.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-05-22 21:34:31 -07:00
Antoine Pourchet 06739d5aa0
KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo (#16024)
For task assignment purposes, the user needs to have a set of information available for each topic partition affecting the desired tasks.

This PR introduces a new interface for a read-only container class that allows all the important and relevant information to be found in one place.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-22 15:52:53 -07:00
Antoine Pourchet 27a6c156c4
KAFKA-15045: (KIP-924 pt. 7) Simplify requirements for rack aware graphs (#16004)
Rack aware graphs don't actually need any topology information about the system, but rather require a simple ordered (not sorted) grouping of tasks.

This PR changes the internal constructors and some interface signatures of RackAwareGraphConstructor and its implementations to allow reuse by future components that may not have access to the actual subtopology information.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-22 13:25:18 -07:00
Antoine Pourchet ef2c5e41a5
KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState (#15972)
This rack information is required to compute rack-aware assignments, which many of the current assigners do.

The internal ClientMetadata class was also edited to pass around this rack information.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-22 13:24:26 -07:00
Nick Telford 5552f5c26d
KAFKA-15541: Add num-open-iterators metric (#15975)
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

This new `StateStore` metric tracks the number of `Iterator` instances
that have been created, but not yet closed (via `AutoCloseable#close()`).

This will aid users in detecting leaked iterators, which can cause major
performance problems.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2024-05-21 23:29:50 -07:00
Antoine Pourchet 6339e3a6bf
KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure (#16002)
This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-21 13:14:39 -07:00
Ayoub Omari 4cc99cbf3f
KAFKA-16343: Add unit tests of foreignKeyJoin classes (#15564)
Added unit tests of two processors included in foreignKey join : SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier.
Renamed ForeignTableJoinProcessorSupplierTest to SubscriptionJoinProcessorSupplierTest as that's the processor which the test class is testing.

Reviewers: Walker Carlson <wcarlson@apache.org>
2024-05-21 14:23:04 -05:00
Bruno Cadonna 69fc4c5da4
MINOR: Migrate tests in o.a.k.streams to JUnit 5 (except KafkaStreamsTest) (#15942)
Reviewers: Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-18 03:31:05 +08:00
Bruno Cadonna c58c21cc20
KAFKA-16774: Delete flaky test since it is redundant (#15978)
The test shouldCloseAllTaskProducersOnCloseIfEosEnabled() in
StreamThreadTest is flaky with a concurrent modification exception.
The concurrent modification exception is caused by the test itself
because it starts a stream thread and at the same time the thread
that executes the test calls methods on the stream thread. The stream
thread was not designed for such a concurrency.
The tests verifies that under EOS the streams producer are closed
during shutdown. Actually the test is not needed since we already
have a test that verifies that when the stream thread shuts down
also the task manager shuts down and for the tasks manager we have
tests that verify that the producers are closed when the task manager
shuts down.

This commit verifies that those tests are run under EOS and ALOS.

Reviewer: Matthias J. Sax <matthias@confluent.io>
2024-05-17 08:36:07 +02:00
Antoine Pourchet fafa3c76dc
KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities (#15956)
The graph solving utilities are currently hardcoded to work with ClientState, but don't actually depend on anything in those state classes.

This change allows the MinTrafficGraphConstructor and BalanceSubtopologyGraphConstructor to be reused with KafkaStreamsStates instead.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Almog Gavra <almog@responsive.dev>
2024-05-16 11:37:59 -07:00
Bruno Cadonna ba19eedb90
KAFKA-7342: Migrate tests in remaining packages in o.a.k.streams (#15963)
Migrates tests in the following packages (excluding subpackages)
to JUnit 5:
- org.apache.kafka.streams.internals
- org.apache.kafka.streams.kstream
- org.apache.kafka.streams.processor
- org.apache.kafka.streams.query
- org.apache.kafka.streams.state
- org.apache.kafka.streams.tests
- org.apache.kafka.streams.utils

Reviewer: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-15 20:28:34 +02:00
Bruno Cadonna d2e6c86632
KAFKA-10199: Remove queue-based remove from state updater (#15896)
Removes the unused remove operation from the state updater
that asynchronously removed tasks and put them into an
output queue.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-15 09:48:33 +02:00
Mickael Maison 34ec3fac15
MINOR: Fix warnings in streams javadoc (#15955)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-15 12:51:47 +08:00
Antoine Pourchet cb968845ec
KAFKA-15045: (KIP-924 pt. 3) Implement KafkaStreamsAssignment (#15944)
This PR changes KafkaStreamsAssignment from an interface to a container class, and implements said class.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-14 17:33:52 -07:00
Antoine Pourchet 0c5e8d3966
KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState (#15920)
This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign.

New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState.

One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-14 17:29:54 -07:00
Walker Carlson 57d30d3450
KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException (#15919)
KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. InvalidPidMappingException should be treated the same way.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Justine Olshan <jolshan@confluent.io>
2024-05-14 11:07:55 -05:00
Bruno Cadonna 5439914c32
KAFKA-10199: Shutdown with new remove operation in state updater (#15894)
Uses the new remove operation of the state updater that returns
a future to shutdown the task manager.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-13 14:33:58 +02:00
Bruno Cadonna cfffe4e2e8
KAFKA-10199: Handle assignment with new remove operation in state updater (#15882)
Uses the new remove operation of the state updater that returns
a future to handle task assignment.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-13 11:12:30 +02:00
Christo Lolov 4bece0131f
KAFKA-14133 Move StreamTaskTest to Mockito (#14716)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-10 11:07:06 +08:00
Lucia Cerchie 31528f581d
KAFKA-15307: update/note deprecated configs (#14360)
Configs default.windowed.value.serde.inner and default.windowed.key.serde.inner
were replace with windowed.inner.class.serde. This PR updates the docs accordingly,
plus a few more side cleanups.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-05-09 19:46:00 -07:00
ChickenchickenLove ff6d01c90f
KAFKA-15951: MissingSourceTopicException should include topic names (#15573)
MissingSourceTopicException should contain the name of the missing topic.
There is one corner case for which we don't have the topic name at hand, but we can log the topic
name somewhere else.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2024-05-09 19:35:36 -07:00
Antoine Pourchet 8fd6596454
KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. (#15887)
This is the first PR in a sequence to support custom task assignors in Kafka Streams, which was described in KIP 924. It creates and exposes all of the interfaces that will need to be implemented during the refactor of the current task assignment logic.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-05-09 19:14:53 -07:00
Ayoub Omari 29f3260a9c
MINOR: Fix streams javadoc links (#15900)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-09 10:52:49 +08:00
Bruno Cadonna f7b242f94e
KAFKA-10199: Revoke tasks from state updater with new remove (#15871)
Uses the new remove operation of the state updater that returns
a future to remove revoked tasks from the state updater.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-08 09:53:58 +02:00
Bruno Cadonna cb35ddc5ca
KAFKA-10199: Remove lost tasks in state updater with new remove (#15870)
Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-07 14:26:23 +02:00
Matthias J. Sax d76352e215
MINOR: log newly created processId (#15851)
Reviewers: Colt McNealy <colt@littlehorse.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-07 14:14:35 +08:00
Bruno Cadonna 366aeab488
KAFKA-10199: Add remove operation with future to state updater (#15852)
Adds a remove operation to the state updater that returns a future
instead of adding the removed tasks to an output queue. Code that
uses the state updater can then wait on the future.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-06 11:27:40 +02:00
Chia Chuan Yu 55a00be4e9
MINOR: Replaced Utils.join() with JDK API. (#15823)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-06 15:13:01 +08:00
Bruno Cadonna 240243b91d
KAFKA-10199: Accept only one task per element in output queue for failed tasks (#15849)
Currently, the state updater writes multiple tasks per exception in the output
queue for failed tasks. To add the functionality to remove tasks synchronously
from the state updater, it is simpler that each element of the output queue for
failed tasks holds one single task.

This commit refactors the class that holds exceptions and failed tasks
in the state updater -- i.e., ExceptionAndTasks -- to just hold one single
task.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-05-03 10:52:12 +02:00
Matthias J. Sax 49587777c1
MINOR: fix timeouts of EosIntegrationTest (#15811)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-26 15:04:05 +08:00
Gaurav Narula 025f9816f1
MINOR: fix javadoc warnings (#15527)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-26 08:31:52 +08:00
TingIāu "Ting" Kì 864744ffd4
KAFKA-16610 Replace "Map#entrySet#forEach" by "Map#forEach" (#15795)
Reviewers: Apoorv Mittal <amittal@confluent.io>, Igor Soarez <soarez@apple.com>
2024-04-25 01:52:24 +01:00
Omnia Ibrahim 1b301b3020
KAFKA-15853 Move socket configs into org.apache.kafka.network.SocketServerConfigs (#15772)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-23 17:39:36 +08:00
Gyeongwon, Do e6b6b6c1c2
MINOR: Remove extra "}" logged in KafkaStreams close (#15783)
Reviewers: Igor Soarez <soarez@apple.com>
2024-04-23 09:45:36 +01:00
Omnia Ibrahim ecb2dd4cdc
KAFKA-15853 Move KafkaConfig log properties and docs out of core (#15569)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Nikolay <nizhikov@apache.org>, Federico Valeri <fvaleri@redhat.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-04-20 04:14:23 +08:00
Matthias J. Sax 5783159175
MINOR: disable internal result emit throttling in TTD (#15660)
Kafka Streams DSL operators use internal wall-clock based throttling
parameters for performance reasons. These configs make the usage of TTD
difficult: users need to advance the mocked wall-clock time in
their test code, or set these internal configs to zero.

To simplify testing, TDD should disable both configs automatically.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-04-18 18:49:10 -07:00
Calvin Liu 53ff1a5a58
KAFKA-15585: DescribeTopicPartitions client side change. (#15470)
Add the support for DescribeTopicPartitions API to AdminClient. For this initial implementation, we are simply loading all of the results into memory on the client side. 

Reviewers: Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
2024-04-18 12:09:14 -04:00
Mickael Maison aee9724ee1
MINOR: Remove unneeded explicit type arguments (#15736)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-17 21:55:58 +02:00
Ayoub Omari e63efbc5d7
MINOR: fix duplicated return and other streams docs typos (#15713)
Reviewers: Johnny Hsu <johnnyhsu@fb.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-04-17 21:26:04 +08:00
Omnia Ibrahim 363f4d2847
KAFKA-15853 Move consumer group and group coordinator configs out of core (#15684)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-17 20:41:22 +08:00
Omnia Ibrahim 8c0458861c
KAFKA-15853 Move KafkaConfig Replication properties and docs out of … (#15575)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-16 15:28:35 +08:00
Omnia Ibrahim 61baa7ac6b
KAFKA-15853 Move transactions configs out of core (#15670)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-13 00:29:51 +08:00
Ayoub Omari 753cc4b7c8
MINOR: Update docs link to deprecated method TimeWindows.grace (#15700)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-12 11:16:10 +08:00
Bruno Cadonna 5c855be015
MINOR: Remove dead code of metric forward-rate (#15686)
Kafka Streams announced the removal  of metric forward-rate in
KIP-444 and removed it completely in AK 3.0. However, we forgot
to remove some code for this metric.
This commit removes the code to create the metric forward-rate.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax <matthias@confluent.io>
2024-04-10 09:28:11 +02:00
Victor van den Hoven ee61bb721e
KAFKA-15417: move outerJoinBreak-flags out of the loop (#15510)
Follow up PR for https://github.com/apache/kafka/pull/14426 to fix a bug introduced by the previous PR.

Cf https://github.com/apache/kafka/pull/14426#discussion_r1518681146

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-04-02 06:46:54 -07:00
Walker Carlson 8b274d8c1b
KAFKA-7663: Reprocessing on user added global stores restore (#15414)
When custom processors are added via StreamBuilder#addGlobalStore they will now reprocess all records through the custom transformer instead of loading directly.

We do this so that users that transform the records will not get improperly formatted records down stream.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-03-28 10:30:18 -05:00
Nikolay 355873aa54
MINOR: Use CONFIG suffix in ZkConfigs (#15614)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Co-authored-by: n.izhikov <n.izhikov@vk.team>
2024-03-28 15:52:34 +01:00
Nikolay 6f38fe5e0a
KAFKA-14588 ZK configuration moved to ZkConfig (#15075)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-27 22:37:01 +08:00
PoAn Yang 6f8d4fe26b
KAFKA-15949: Unify metadata.version format in log and error message (#15505)
There were different words for metadata.version like metadata version or metadataVersion. Unify format as metadata.version.

Reviewers: Luke Chen <showuon@gmail.com>
2024-03-26 20:09:29 +08:00
Christo Lolov 997ca14f80
KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
2024-03-22 10:43:53 +01:00
Kuan-Po (Cooper) Tseng 12a1d85362
KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-20 10:36:25 +08:00
Matthias J. Sax 313574e329
MINOR: fix flaky EosIntegrationTest (#15494)
Bumping some timeout due to slow Jenkins build.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-03-16 18:06:45 -07:00
A. Sophie Blee-Goldman 96bfac4216
MINOR: Update javadocs and exception string in "deprecated" ProcessorRecordContext#hashcode (#15508)
This PR updates the javadocs for the "deprecated" hashCode() method of ProcessorRecordContext, as well as the UnsupportedOperationException thrown in its implementation, to actually explain why the class is mutable and therefore unsafe for use in hash collections. They now point out the mutable field in the class (namely the Headers)

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
2024-03-14 23:08:39 -07:00
Matthias J. Sax 612a1fe1bb
MINOR: Kafka Streams docs fixes (#15517)
- add missing section to TOC
- add default value for client.id

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <bruno@confluent.io>
2024-03-13 21:54:06 -07:00
Cheryl Simmons 2c613b2d42
MINOR: Tweak streams config doc (#15518)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-03-12 15:21:43 -07:00
Christo Lolov 8b72a2c72f
KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 3 (#15497)
The previous pull request in this series was #15261.

This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.

The next pull request in the series will be #15254 which ought to complete the Mockito migration for the TaskManagerTest class

Reviewer: Bruno Cadonna <cadonna@apache.org>
2024-03-11 12:51:20 +01:00
Matthias J. Sax 2fcafbd497 HOTFIX: fix html markup 2024-03-08 14:28:39 -08:00
Matthias J. Sax 861fe68cee TRIVIAL: fix typo 2024-03-08 13:52:33 -08:00
Daan Gerits b9a5b4a805
KAFKA-10892: Shared Readonly State Stores ( revisited ) (#12742)
Implements KIP-813.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2024-03-08 10:57:56 -08:00
Matthias J. Sax d8dd068a62
KAFKA-15964: fix flaky StreamsAssignmentScaleTest (#15485)
This PR bumps some timeouts due to slow Jenkins builds.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-03-07 09:17:52 -08:00
Christo Lolov a33c47ea4d
KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 (#15261)
The previous pull request in this series was #15112.

This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.

I envision there will be at least 1 more pull request to clean things up. For example, all calls to taskManager.setMainConsumer should be removed.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2024-03-07 10:33:31 +01:00
Matthias J. Sax ccf4bd5f46
MINOR: Add 3.7 to Kafka Streams system tests (#15443)
Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-03-06 12:02:58 -08:00
Matthias J. Sax 6d7b25bb25
KAFKA-15797: Fix flaky EOS_v2 upgrade test (#15449)
Originally, we set commit-interval to MAX_VALUE for this test,
to ensure we only commit expliclity. However, we needed to decrease it
later on when adding the tx-timeout verification.

We did see failing test for which commit-interval hit, resulting in
failing test runs. This PR increase the commit-interval close to
test-timeout to avoid commit-interval from triggering.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-03-06 10:27:12 -08:00
Victor van den Hoven e81379d3fe
KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items (#14426)
Kafka Streams support asymmetric join windows. Depending on the window configuration
we need to compute window close time etc differently.

This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and
introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped.

Reviewers: Hao Li <hli@confluent.io>, Guozhang Wang <guozhang.wang.us@gmail.com>, Matthias J. Sax <matthias@confluent.io>
2024-03-05 17:06:20 -08:00
Nikolay eea369af94
KAFKA-14588 Log cleaner configuration move to CleanerConfig (#15387)
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-05 18:11:56 +08:00
Ayoub Omari 4f92a3f0af
KAFKA-14747: record discarded FK join subscription responses (#15395)
A foreign-key-join might drop a "subscription response" message, if the value-hash changed.
This PR adds support to record such event via the existing "dropped records" sensor.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-03-04 15:56:40 -08:00
Ayoub Omari 7dbdc15c66
KAFKA-15625: Do not flush global state store at each commit (#15361)
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2024-03-04 10:19:59 +01:00
Ayoub Omari 907e945c0b
MINOR: fix SessionStore java doc (#15412)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-04 01:03:04 +08:00
Almog Gavra 1c9f360f4a
KAFKA-15215: migrate StreamedJoinTest to Mockito (#15424)
Migrate StreamedJoinTest to Mockito

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, 
Divij Vaidya <diviv@amazon.com>
2024-02-26 18:52:25 -08:00
Daan Gerits 06392f7ae2
MINOR: Update of the PAPI testing classes to the latest implementation (#12740)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-02-22 18:15:24 -08:00
Justine Olshan 661e41c92f
KAFKA-16302: Remove check for log message that is no longer present (fix builds) (#15422)
a3528a3 removed this log but not the test asserting it.

Builds are currently red because for some reason these tests can't retry. We should address that as a followup.

Reviewers:  Greg Harris <greg.harris@aiven.io>,  Matthias J. Sax <matthias@confluent.io>
2024-02-22 17:10:11 -08:00
Matthias J. Sax a3528a316f
MINOR: remove unnecessary logging (#15396)
We already record dropping record via metrics and logging at WARN level
is too noise. This PR removes the unnecessary logging.

Reviewers: Kalpesh Patel <kpatel@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2024-02-21 08:01:11 -08:00
Lucas Brutschy fcbfd3412e
KAFKA-16284: Fix performance regression in RocksDB (#15393)
A performance regression introduced in commit 5bc3aa4 reduces the write performance in RocksDB by ~3x. The bug is that we fail to pass the WriteOptions that disable the write-ahead log into the DB accessor.

For testing, the time to write 10 times 1 Million records into one RocksDB each were measured:

Before 5bc3aa4: 7954ms, 12933ms
After 5bc3aa4: 30345ms, 31992ms
After 5bc3aa4 with this fix: 8040ms, 10563ms
On current trunk with this fix: 9508ms, 10441ms

Reviewers: Bruno Cadonna <bruno@confluent.io>, Nick Telford <nick.telford@gmail.com>
2024-02-21 09:01:53 +01:00
Matthias J. Sax 4c70581eb6
KAFKA-15770: IQv2 must return immutable position (#15219)
ConsistencyVectorIntegrationTest failed frequently because the return
Position from IQv2 is not immutable while the test assume immutability.
To return a Position with a QueryResult that does not change, we need to
deep copy the Position object.

Reviewers: John Roesler <john@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2024-02-20 12:24:32 -08:00
Minha, Jeong 553f45bca8
MINOR: Fix toString method of IsolationLevel (#14782)
Reviewers: Matthias J. Sax <matthias@confluent.io>, Ashwin Pankaj <apankaj@confluent.io>
2024-02-15 19:07:18 -08:00
Mickael Maison 0bf830fc9c
KAFKA-14576: Move ConsoleConsumer to tools (#15274)
Reviewers: Josep Prat <josep.prat@aiven.io>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
2024-02-13 19:24:07 +01:00
Owen Leung d233eb98f7
KAFKA-14957: Update-Description-String (#13909)
HTML code for configs is auto-generated and for Kafka Streams config `state.dir` produces a confusing default value.
This PR adds a new property `alternativeString` to set a "default" value which should be rendered in HTML instead of the actual default value.

Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, @eziosudo <eziosudo@gmail.com>, Matthias J. Sax <matthias@confluent.io>
2024-02-10 12:46:51 -08:00
Kohei Nozaki 87c7fc0cd7
KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders (#15121)
This PR replaces a HashMap by a ConcurrentHashMap so that the local state store queries can be made from multiple threads. See this for additional context: https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
2024-02-07 13:02:42 -08:00
Greg Harris 5f35b41e92
KAFKA-15834: Remove more leaky NamedTopologyIntegrationTest tests (#15243)
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
2024-02-05 11:39:47 -08:00
Lucas Brutschy a101d20c40
KAFKA-16220: Increase timeout in KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest (#15307)
The test is flaky, since sometimes one of the threads haven't processed a single record that
cause the ERROR state in test run (due to unlucky rebalancing).

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-02-02 15:07:53 +01:00
Matthias J. Sax 93712eca15
KAFKA-15594: Add version 3.6 to Kafka Streams system tests (#15151)
Reviewers: Walker Carlson <wcarlson@confluent.io>
2024-01-26 14:59:24 -08:00
Matthias J. Sax aaccf542d1
KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.

This PR reverts this change for now.

Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
2024-01-19 09:23:42 -08:00
Matthias J. Sax 2dc3fff14a
KAFKA-16139: Fix StreamsUpgradeTest (#15207)
Adds version 3.6 to the possible values for config upgrade_from.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-01-17 13:28:12 -08:00
Lucas Brutschy 26465c6409
KAFKA-16097: Disable state updater in trunk (#15204)
Several problems are still appearing while running 3.7 with
the state updater. This change will disable the state updater
by default also in trunk.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2024-01-17 15:25:26 +01:00
Bruno Cadonna e563aad4ee
KAFKA-16139: Fix StreamsUpgradeTest (#15199)
Adds version 3.5 to the possible values for config upgrade_from.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-01-16 17:25:33 -08:00
Greg Harris 055ff2b831
KAFKA-15834: Remove NamedTopologyIntegrationTest case which leaks clients (#15185)
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>, Matthias J. Sax <matthias@confluent.io>
2024-01-16 10:20:39 -08:00
Nick Telford 8904317518
KAFKA-16089: Fix memory leak in RocksDBStore (#15174)
`ColumnFamilyDescriptor` is _not_ a `RocksObject`, which in theory means
it's not backed by any native memory allocated by RocksDB.

However, in practice, `ColumnFamilyHandle#getDescriptor()`, which
returns a `ColumnFamilyDescriptor`, allocates an internal
`rocksdb::db::ColumnFamilyDescriptor`, copying the name and handle of
the column family into it.

Since the Java `ColumnFamilyDescriptor` is not a `RocksObject`, it's not
possible to track this allocation and free it from Java.

Fortunately, in our case, we can simply avoid calling
`ColumnFamilyHandle#getDescriptor()`, since we're only interested in the
column family name, which is already available on
`ColumnFamilyHandle#getName()`, which does not leak memory.

We can also optimize away the temporary `Options`, which was previously a
source of memory leaks, because `userSpecifiedOptions` is an instance of
`Options`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-11 18:09:54 +01:00
Divij Vaidya 65424ab484
MINOR: New year code cleanup - include final keyword (#15072)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Sagar Rao <sagarmeansocean@gmail.com>
2024-01-11 17:53:35 +01:00
Bruno Cadonna fbbfafe1f5
KAFKA-16098: Verify pending recycle action when standby is re-assigned (#15168)
When a standby is recycled to an active and then re-assigned as
a standby again, it might happen that the recycling is still
pending when the standby is reassigned. That causes an illegal
state exception from the main consumer since the active task
that results from the recycling is actually not assigned to
the main consumer anymore, but it was re-assigned as a
standby in the most recent rebalance.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-10 17:59:06 +01:00
Christo Lolov 5a0a4c5a54
MINOR: Address occasional UnnecessaryStubbingException in StreamThreadTest (#15134)
Reviewers: Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
2024-01-10 14:39:59 +01:00
Christo Lolov ee96935c60
KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito (#15112)
Reviewers: Divij Vaidya <diviv@amazon.com>
2024-01-10 14:34:03 +01:00
Lucas Brutschy 0349f2310c
KAFKA-16097: Add suspended tasks back to the state updater when reassigned (#15163)
When a partition is revoked, the corresponding task gets a pending action
"SUSPEND". This pending action may overwrite a previous pending action.

If the task was previously removed from the state updater, e.g. because
we were fenced, the pending action is overwritten with suspend, and in
handleAssigned, upon reassignment of that task, then SUSPEND action is
removed.

Then, once the state updater executes the removal, no pending action
is registered anymore, and we run into an IllegalStateException.

This commit solves the problem by adding back reassigned tasks to the
state updater, since they may have been removed from the state updater
for another reason than being restored completely.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2024-01-10 10:21:38 +01:00
sanepal f1a0207cbb
KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)
KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-01-08 14:49:48 -08:00
Lucas Brutschy c0b6493455
KAFKA-16077: Streams with state updater fails to close task upon fencing (#15117)
* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated

There is a race condition in the state updater that can cause the following:

 1. We have an active task in the state updater
 2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
 3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
 4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
 5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized

This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.

To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.

We should not remove the task twice, otherwise, we'll end up in this situation

1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException

Reviewers: Bruno Cadonna <cadonna@apache.org>
2024-01-05 19:32:33 +01:00
Nick Telford cce63274f2
KAFKA-16086: Fix memory leak in RocksDBStore (#15135)
We allocate an `Options` in order to list column families while opening
the `RocksDBStore`, but never explicitly `close()` it.

`Options` is a RocksDB native object, which needs to be explicitly
closed to free the resources it allocates in native memory.

Failing to close this causes a memory leak when repeatedly
opening/closing stores.

It's an `AutoCloseable`, and all usage of it is confined to the
surrounding `try` block, so we can just hook it out to the `try` to
auto-close it when done.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-05 15:40:00 +01:00
Christo Lolov c703ce2563
KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito (#15125)
Reviewers: Divij Vaidya <diviv@amazon.com>
2024-01-04 21:54:15 +01:00
Christo Lolov e6f2624c48
KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito (#15116)
Reviewers: Divij Vaidya <diviv@amazon.com>
2024-01-04 13:50:14 +01:00
Nick Telford 5bc3aa4280
KAFKA-14412: Decouple RocksDB access from CF (#15105)
To support future use-cases that use different strategies for accessing
RocksDB, we need to de-couple the RocksDB access strategy from the
Column Family access strategy.

To do this, we now have two separate accessors:

  * `DBAccessor`: dictates how we access RocksDB. Currently only one
    strategy is supported: `DirectDBAccessor`, which access RocksDB
    directly, via the `RocksDB` class for all operations. In the future, a
    `BatchedDBAccessor` will be added, which enables transactions via
    `WriteBatch`.
  * `ColumnFamilyAccessor`: maps StateStore operations to operations on
    one or more column families. This is a rename of the old
    `RocksDBDBAccessor`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-04 11:42:30 +01:00
Matthias J. Sax c078e51c8f
MINOR: improve logging for state management (#15045)
Increase log level to INFO similar to other log statement in this class, to surface important information on the non-critical code path.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-04 02:21:52 -08:00
Christo Lolov 43a5ff2570
KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito (#15106)
This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-03 09:48:51 +01:00
Almog Gavra e6875f378c
KAFKA-16046: also fix stores for outer join (#15073)
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
2024-01-02 15:07:46 -08:00
Lucas Brutschy e01eed32ab
KAFKA-9545: Fix IllegalStateException in updateLags (#15096)
We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.

This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2024-01-02 16:35:31 +01:00
Christo Lolov 65a28246ad
KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito (#14929)
Reviewers: Divij Vaidya <diviv@amazon.com>
2024-01-02 13:36:52 +01:00
Nikolay 45bd19f2ef
KAFKA-14588: Move ConfigType to server-common (#14867)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2023-12-22 18:35:27 +01:00
Almog Gavra 18a65b25c1
KAFKA-16046: fix stream-stream-join store types (#15061)
Before #14648, the KStreamImplJoin class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended.

To fix this, we allow creating non-timestamped stores using the DslWindowParams

Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. I also confirmed that the only place in the code persistentWindowStore was used before #14648 was in the StreamJoined code.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-22 15:23:58 +01:00
Divij Vaidya 6250049e10
KAFKA-13950: Fix resource leak in error scenarios (#12228)
We are not properly closing Closeable resources in the code base at multiple places especially when we have an exception. This code change fixes multiple of these leaks.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
2023-12-21 13:47:22 +01:00
Bruno Cadonna 19727f8d51
KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15044)
Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-21 10:15:04 +01:00
Nick Telford b903d786ba
KAFKA-14412: Generalise over RocksDB WriteBatch (#14853)
* KAFKA-14412: Generalise over RocksDB WriteBatch

The type hierarchy of RocksDB's `WriteBatch` looks like this:

```
        +---------------------+
        | WriteBatchInterface |
        +---------------------+
                   ^
                   |
        +---------------------+
        |  AbstractWriteBatch |
        +---------------------+
                   ^
                   |
        +----------+----------+
        |                     |
 +------------+    +---------------------+
 | WriteBatch |    | WriteBatchWithIndex |
 +------------+    +---------------------+
```

By switching our `BatchWritingStore` methods from `WriteBatch` to
`WriteBatchInterface`, we enable the use of `WriteBatchWithIndex` as
well.

* Improve error reporting for unknown batch type

We should be using an `IllegalStateException`, and we should log a
message informing the user that this is a bug.

This branch should be unreachable, as both of the possible
implementations of `WriteBatchInterface` are matched in the previous two
branches.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2023-12-19 13:10:24 -08:00
Stanislav Kozlovski b352cc6b4e
MINOR: Bump trunk to 3.8.0-SNAPSHOT (#14993)
This patch bumps the next release version to 3.8.0-SNAPSHOT.

Following the Release Process, I created the 3.7 branch and am following the steps to bump these versions:

Modify the version in trunk to bump to the next one (eg. "0.10.1.0-SNAPSHOT") in the following files:

docs/js/templateData.js
gradle.properties
kafka-merge-pr.py
streams/quickstart/java/pom.xml
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
streams/quickstart/pom.xml
tests/kafkatest/__init__.py
2023-12-14 09:07:18 +01:00
Bruno Cadonna 87e3cbe4da
MINOR: Add junit properties to display parameterized test names (#14983)
In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link
This is because the constant in @ParameterizedTest

String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";

This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method

For existing tests which override the name, should work as is. The precedence rules are explained

    name attribute in @ParameterizedTest, if present
    value of the junit.jupiter.params.displayname.default configuration parameter, if present
    DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest

Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names

Sample test run output
Before: [1] true link
After: testMetadataExpiry(boolean).false link

This commit is an extension of bdf6d46b41 which needed to reverted due to introduces test failures.

Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-13 09:42:18 +01:00
Matthias J. Sax 083aa61a96
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14936)
Part of KIP-714.

Add support to collect client instance id of the restore consumer.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-12 08:54:45 -08:00
Hao Li 85cee984ac
MINOR: Fix rack-aware assignment tests (#14965)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-11 01:38:57 -08:00
Matthias J. Sax f52575b172
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14948)
Part of KIP-714.

Adds support to expose producer client instance id.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-11 00:20:01 -08:00
Matthias J. Sax fb5d45d26e
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14935)
Part of KIP-714.

Add support to collect client instance id of the global consumer.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-08 09:42:32 -08:00
David Jacot b96ded9859
Revert "MINOR: Add junit properties to display parameterized test names (#14687)" (#14961)
This reverts commit bdf6d46b41. We found out that this commit introduced flakiness in Streams' tests. We will revise it.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2023-12-07 23:20:03 -08:00
Hanyu Zheng 5ba7bfaa57
KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. (#14907)
Update to KIP-992.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-07 15:29:29 -08:00
Matthias J. Sax 7dabd27f8d
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14922)
Part of KIP-714.

Adds support to expose main consumer client instance id.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-07 10:39:39 -08:00
Hanyu Zheng 9d2297ad2d
KAFKA-15527: Support ResultOrder to reverseRange and reverseAll query over kv-store in IQv2 (#14906)
Update to KIP-985.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-07 08:32:16 -08:00
Alieh Saeedi 6694ea424a
KAFKA-15347: fix unit test (#14947)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-06 19:53:43 -08:00
A. Sophie Blee-Goldman 0ca6e3e359
MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener (#14886)
Minor cleanup to make it easier to follow the restore listener logic. Currently, the KafkaStreams class tracks two restore listener fields: there is a non-final, nullable "globalRestoreListener" that holds the restore listener specified by the user (if any), and then there is a final "delegatingRestoreListener" that's used to encapsulate the null checks for the user-specified restore listener. It's a bit confusing to follow along with what each of these restore listener fields is doing and the relationship between them when they're on equal footing like this, when in reality they're more hierarchical and the DelegatingRestoreListener is actually a wrapper over the user-specified globalRestoreListener. The term "global" is also a bit misleading as it can get mixed up with global state stores, when it's really meant to be "global" in the sense that it applies to all state stores in the application.

It would be nice to just move the user listener completely inside the DelegatingRestoreListener class and then make that class static, as well as renaming the field to "userRestoreListener"

Reviewers: Matthias J. Sax <mjsax@apache.org>
2023-12-06 10:38:54 -08:00
Alok Thatikunta bdf6d46b41
MINOR: Add junit properties to display parameterized test names (#14687)
In many parameterized tests, the display name is broken. Example - `testMetadataFetch` appears as `[1] true`, `[2] false`  [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/) 
This is because the constant in `@ParameterizedTest`
```java
String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
```

This PR adds a new `junit-platform.properties` which overrides to add a `{displayName}` which shows the `the display name of the method`

For existing tests which override the name, should work as is. The precedence rules are explained

> 1. `name` attribute in `@ParameterizedTest`, if present
> 2. value of the `junit.jupiter.params.displayname.default` configuration parameter, if present
> 3. `DEFAULT_DISPLAY_NAME` constant defined in `@ParameterizedTest`

Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names

Sample test run output 
Before: `[1] true` [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/)
After: `testMetadataExpiry(boolean).false` [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14687/1/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/)

Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>, David Jacot <djacot@confluent.io>
2023-12-06 08:42:45 -08:00
Hao Li 6be2e5c131
KAFKA-15022: tests for HA assignor and StickyTaskAssignor (#14921)
Part of KIP-925.

Tests for HAAssignor and StickyAssignor.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-06 08:12:47 -08:00
Alieh Saeedi 9658942366
KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) (#14626)
Implements KIP-968.

Add new query type MultiVersionedKeyQuery for IQv2 supported by versioned state stores.
2023-12-06 07:56:12 -08:00
Eduwer Camacaro 83110e2d42
KAFKA-15448: Streams Standby Update Listener (KIP-988) (#14735)
Implementation for KIP-988, adds the new StandbyUpdateListener interface

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Colt McNealy <colt@littlehorse.io>
2023-12-06 01:27:38 -08:00
Hao Li f6560ab1cd
KAFKA-15022: introduce interface to control graph constructor (#14714)
Part of KIP-925.

Refactor graph construction and assignment in RackAwareAssignor to new interface.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-05 22:00:04 -08:00
Florin Akermann 4a958c6cb1
Kafka-14748: Relax non-null FK left-join requirement (#14107)
Relax non-null FK left-join requirement.

Testing Strategy: Inject extractor which returns null on first or second element.

Reviewers: Walker Carlson <wcarlson@apace.org>
2023-12-05 18:04:32 -06:00
Matthias J. Sax 45f5d0f621
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14908)
- Part of KIP-714
- Add new configs and public API for Kafka Streams
- Implement support to get admin client instance id

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Apoorv Mittal <amittal@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-12-05 12:19:56 -08:00
ashwinpankaj f2aeff0026
KAFKA-9545: Fix Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` (#14910)
RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted does not wait to ensure that test-topic-A is deleted. The second assignment condition times out in 15sec.

We should wait for the topic to be deleted (default timeout = 30sec) and then check the assignment.

Reviewers: Walker Carlson <wcarlson@apache.org>
2023-12-05 11:16:06 -06:00
Nick Telford 20a223061c
KAFKA-14412: Better Rocks column family management (#14852)
When opening RocksDB, we were checking for an error in
`RocksDBTimestampedStore` to detect if the `keyValueWithTimestamp` CF is
missing.

The `openRocksDB` method now supports any number of column families, not
just the extra one used by `RocksDBTimestampedStore`. We now check for
the existing column families _before_ opening the database, which allows
us to create any missing column families.

Supporting automatic creation of any number of missing column families
is a pre-requisite for KIP-892: Transactional StateStores.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
2023-12-05 10:02:04 +01:00
Christo Lolov d4c95cfc2a
KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito (#13932)
This pull request is an attempt to get what has started in #12524 to completion as part of the Streams project migration to Mockito.

Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
2023-12-04 18:37:57 +01:00
Lucas Brutschy 59ac9be21c
HOTFIX: fix ConsistencyVectorIntegrationTest failure (#14895)
#14570 changed the result for KeyQuery from ValueAndTimestamp<V> to
V, but forgot to update ConsistencyVectorIntegrationTest accordingly.
2023-12-03 23:06:41 +01:00
Matthias J. Sax 1a2f74be67 MINOR: fix typo 2023-12-01 15:39:32 -08:00
Matthias J. Sax b22bbd656c
MINOR: cleanup internal Iterator impl (#14889)
makeNext() is internal and visibility should not be extended to `public`

Reviewers: Walker Carlson <wcarlson@confluent.io>
2023-12-01 11:53:07 -08:00
Lucas Brutschy bfee3b3c6b
KAFKA-15690: Fix restoring tasks on partition loss, flaky EosIntegrationTest (#14869)
The following race can happen in the state updater code path

Task is restoring, owned by state updater
We fall out of the consumer group, lose all partitions
We therefore register a "TaskManager.pendingUpdateAction", to CLOSE_DIRTY
We also register a "StateUpdater.taskAndAction" to remove the task
We get the same task reassigned. Since it's still owned by the state updater, we don't do much
The task completes restoration
The "StateUpdater.taskAndAction" to remove will be ignored, since it's already restored
Inside "handleRestoredTasksFromStateUpdater", we close the task dirty because of the pending update action
We now have the task assigned, but it's closed.
To fix this particular race, we cancel the "close" pending update action. Furthermore, since we may have made progress in other threads during the missed rebalance, we need to add the task back to the state updater, to at least check if we are still at the end of the changelog. Finally, it seems we do not need to close dirty here, it's enough to close clean when we lose the task, related to KAFKA-10532.

This should fix the flaky EOSIntegrationTest.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2023-12-01 18:57:27 +01:00
Hanyu Zheng f1cd11dcc5
KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery (#14570)
Implements KIP-992.

Adds TimestampedKeyQuery and TimestampedRangeQuery (IQv2) for ts-ks-store, plus changes semantics of existing KeyQuery and RangeQuery if issues against a ts-kv-store, now unwrapping value-and-timestamp and only returning the plain value.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-11-30 12:14:23 -08:00
Nick Telford 96b43bf16f
KAFKA-14412: Add ProcessingThread tag interface (#14839)
This interface provides a common supertype for `StreamThread` and
`DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892
to differentiate between "processing" threads and interactive query
threads.

This is needed because `DefaultTaskExecutor.TaskExecutorThread` is
`private`, so cannot be seen directly from `RocksDBStore`.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-11-30 09:44:02 +01:00
Greg Harris 9f896ed6c9
KAFKA-15816: Fix leaked sockets in streams tests (#14769)
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Matthias J. Sax <mjsax@apache.org>
2023-11-29 11:53:34 -08:00
Hao Li e7b9bd5a26
KAFKA-15022: add config for balance subtopology in rack aware task assignment (#14711)
Part of KIP-925.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-11-29 11:33:52 -08:00
Hao Li 10555ec6de
KAFKA-15022: Only relax edge when path exist (#14198)
If there is no path from u to v, we should not represent it at Integer.MAX_VALUE but null instead.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-11-28 20:44:12 -08:00
Hao Li bbd75b80ce
KAFKA-15022: Detect negative cycle from one source (#14696)
Introduce a dummy node connected to every other node and run Bellman-ford from the dummy node once instead of from every node in the graph.

Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <matthias@confluent.io>
2023-11-28 00:29:00 -08:00
Lucas Brutschy fe58cb1ebd
KAFKA-13531: Disable flaky NamedTopologyIntegrationTest (#14830)
Named topologies is a feature that is planned to be removed from AK with 4.0 and was never used via the public interface. It was used in a few versions of KSQL only, but was disabled there as well. While we do not want to remove it in 3.7 yet, we should disable flaky tests in that feature, that are disruptive to AK development.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2023-11-24 10:44:25 +01:00
Almog Gavra 9309653219
KAFKA-15215: [KIP-954] support custom DSL store providers (#14648)
Implementation for KIP-954: support custom DSL store providers

Testing Strategy:
- Updated the topology tests to ensure that the configuration is picked up in the topology builder
- Manually built a Kafka Streams application using a customer DslStoreSuppliers class and verified that it was used

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang.wang.us@gmail.com>
2023-11-21 13:51:39 -08:00
Bruno Cadonna 922d0d0e5c
MINOR: Do not check whether updating tasks exist in the waiting loop (#14791)
The state updater waits on a condition variable if no tasks exist that need to be updated. The condition variable is wrapped by a loop to account for spurious wake-ups. The check whether updating tasks exist is done in the condition of the loop. Actually, the state updater thread can change whether updating tasks exists, but since the state updater thread is waiting for the condition variable the check for the existence of updating tasks will always return the same value as long as the state updater thread is waiting. Thus, the check only need to be done once before entering the loop.

This commit moves check before the loop making also the usage of mocks more robust since the processing becomes more deterministic.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-11-18 21:10:21 +01:00
vamossagar12 e7f4f5dfe7
[MINOR] Removing unused variables from StreamThreadTest (#14777)
A few variables which aren't being used anymore but still exist. This commit removes those unused variables.

Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-11-16 11:00:11 +01:00
Alieh 0489b7cd33
KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
This PR implements KIP-960 which add support for `VersionedKeyQuery`.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-11-15 17:34:54 -08:00
Kirk True 22f7ffe5e1
KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:

* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)

The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:

* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`

`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.

This task is part of the work to implement support for the new KIP-848 consumer group protocol.

Reviewers: Philip Nee <pnee@confluent.io>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
2023-11-15 05:00:40 -08:00
Zihao Lin 7c562a776d
HOTFIX: Fix compilation error for JDK 21 caused by this-escape warning (#14740)
This patch fixes the compilation error for JDK 21 introduced in https://github.com/apache/kafka/pull/14708.

Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
2023-11-12 08:59:40 +01:00
Almog Gavra 39cacca89b
KAFKA-15774: refactor windowed stores to use StoreFactory (#14708)
This is a follow up from #14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
2023-11-10 18:19:11 -08:00
Bruno Cadonna 81cceedf7e
MINOR: Delete task-level commit sensor (#14677)
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
2023-11-09 15:37:13 +01:00
Bruno Cadonna f1e58a35d7
MINOR: Do not checkpoint standbys when handling corrupted tasks (#14709)
When a task is corrupted, uncorrupted tasks are committed. That is also true for standby tasks. Committing standby tasks actually means that they are checkpointed.

When the state updater is enabled, standbys are owned by the state updater. The stream thread should not checkpoint them when handling corrupted tasks. That is not a big limitation since the state updater periodically checkpoints standbys anyway. Additionally, when handling corrupted tasks the important thing is to commit active running tasks to abort the transaction. Committing standby tasks do not abort any transaction.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-11-08 16:09:24 +00:00
Almog Gavra febf0fb573
KAFKA-15774: introduce internal StoreFactory (#14659)
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in #14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...).

It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes)

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2023-11-06 17:30:58 -08:00
gongzhongqiang d682b15eeb
KAFKA-15769: Fix logging with exception trace (#14683)
Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>
2023-11-06 11:02:05 +01:00
Christo Lolov ba394aa28a
KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-11-06 10:41:37 +01:00
Christo Lolov 760abfbdab
KAFKA-14133: Move StreamsMetricsImplTest to Mockito (#14623)
Reviewers: Divij Vaidya <diviv@amazon.com>, Ismael Juma <ismael@juma.me.uk>
2023-11-01 12:13:06 +01:00
Florin Akermann b5c24974ae
Kafka 12317: Relax non-null key requirement in Kafka Streams (#14174)
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)

The key requirments got relaxed for the followinger streams dsl operator:

left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.

Reviewers: Walker Carlson <wcarlson@apache.org>
2023-10-31 11:09:42 -05:00
James Cheng b9f2874c44
MINOR: Fix typo in a comment at KTableFilter (#14665)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-10-30 10:16:12 +01:00
bachmanity1 f0e97397c0
KAFKA-14133: Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest (#14410)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
2023-10-27 10:45:25 +02:00
Hanyu Zheng 834f72b03d
KAFKA-15527: Update docs and JavaDocs (#14600)
Part of KIP-985.

Updates JavaDocs for `RangeQuery` and `ReadOnlyKeyValueStore` with regard to ordering guarantees.
Updates Kafka Streams upgrade guide with KIP information.

Reviewer: Matthias J. Sax <matthias@confluent.io>
2023-10-26 17:48:28 -07:00
Levani Kokhreidze 986c1b1f31
KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test (#14608)
Trying to fix flakiness for the shouldInvokeUserDefinedGlobalStateRestoreListener test introduced in #14519.

Fixes are:

-Do not use static membership.
-Always close the 2nd KafkaStreams instance.
-Await for the Kafka Streams instance to transition to a RUNNING state before proceeding.
-Added logging for the StateRestoreListener implementation.
-Reduce restore consumer MAX_POLL_RECORDS.

Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
2023-10-26 14:56:33 -07:00
Matthias J. Sax a6c14003a9
HOTFIX: close iterator to avoid resource leak (#14624)
Reviewers: Hao Li <hli@confluent.io>, Bill Bejeck <bill@confluent.io>
2023-10-26 10:30:39 -07:00
Lucas Brutschy b061ab7701
MINOR: Fix misleading log-line (#14643)
After finishing restoration, we should only log the active tasks. Standby tasks are not part of restoration and it can be confusing to see them show up on this log message.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-10-26 08:31:46 -07:00
Lucas Brutschy d144b7ee38
KAFKA-15326: [10/N] Integrate processing thread (#14193)
- Introduce a new internal config flag to enable processing threads
- If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread
- All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled.
- Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free.
- Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply.
Enable some integration tests with processing threads enabled.

Reviewer: Bruno Cadonna <bruno@confluent.io>
2023-10-24 10:17:55 +02:00
Mickael Maison 8b9f6d17f2
KAFKA-15093: Add 3.5 Streams upgrade system tests (#14602)
Reviewers: Matthias J. Sax <mjsax@apache.org>
2023-10-23 13:26:50 +02:00
Mickael Maison 9c77c17c4e
KAFKA-15664: Add 3.4 Streams upgrade system tests (#14601)
Reviewers: Luke Chen <showuon@gmail.com>,  Matthias J. Sax <mjsax@apache.org>
2023-10-23 10:33:59 +02:00
Christo Lolov b5ec6e8a0d
KAFKA-14133: Move RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest to Mockito (#14586)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-10-20 16:09:36 +02:00
Hanyu Zheng bbdf6de88a
KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 (#14477)
Implements KIP-985.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-10-19 10:16:19 -07:00
Matthias J. Sax 9b468fb278
MINOR: Do not end Javadoc comments with `**/` (#14540)
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bill@confluent.io>, Hao Li <hli@confluent.io>, Josep Prat <josep.prat@aiven.io>
2023-10-17 21:11:04 -07:00
Lucas Brutschy e7e399b940
MINOR: allow removing a suspended task from task registry. (#14555)
When we get a suspended task re-assigned in the eager rebalance protocol, we have to add the task back to the state updater so that it has a chance to catch up with its change log.

This was prevented by a check in Tasks, which disallows removing SUSPENDED tasks from the task registry. I couldn't find a reason why this must be an invariant of the task registry, so this weakens the check.

The error happens in the integration between TaskRegistry and TaskManager. However, this change anyway adds unit tests to more closely specify the intended behavior of the two modules.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2023-10-17 14:32:41 +02:00
Hanyu Zheng 732bffcae6
KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest (#14523)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-10-16 17:30:05 -07:00
Matthias J. Sax d4c661c017
MINOR: cleanup warnings in Kafka Streams code base (#14549)
Reviewers: Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@responsive.dev>
2023-10-15 19:32:32 -07:00
Matthias J. Sax 649e2cbc8f
MINOR: Fix `Consumed` to return new object instead of `this` (#14550)
We embrace immutability and thus should return a new object instead of
`this`, similar to other config classed we use in the DSL.

Side JavaDocs cleanup for a bunch of classes.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-10-15 19:28:54 -07:00
Matthias J. Sax cd1b7639cb
MINOR: cleanup some warning in Kafka Streams examples (#14547)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-10-13 19:00:22 -07:00
Ismael Juma 4cf86c5d2f
KAFKA-15492: Upgrade and enable spotbugs when building with Java 21 (#14533)
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.

Reviewers: Divij Vaidya <diviv@amazon.com>
2023-10-12 14:09:10 +02:00
Bruno Cadonna c7f730d9d9
MINOR: Only commit running active and standby tasks when tasks corrupted (#14508)
When tasks are found corrupted, Kafka Streams tries to commit
the non-corrupted tasks before closing and reviving the corrupted
active tasks. Besides active running tasks, Kafka Streams tries
to commit restoring active tasks and standby tasks. However,
restoring active tasks do not need to be committed since they
do not have offsets to commit and the current code does not
write a checkpoint. Furthermore, trying to commit restoring
active tasks with the state updater enabled results in the
following error:

java.lang.UnsupportedOperationException: This task is read-only
at org.apache.kafka.streams.processor.internals.ReadOnlyTask.commitNeeded(ReadOnlyTask.java:209)
...

since commitNeeded() is not a read-only method for active tasks.

In future, we can consider writing a checkpoint for active
restoring tasks in this situation. Additionally, we should
fix commitNeeded() in active tasks to be read-only.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-10-12 13:24:54 +02:00
Levani Kokhreidze 7d1847c4c3
MINOR: Fix KafkaStreams#streamThreadLeaveConsumerGroup logging (#14526)
Fixes logging for KafkaStreams#streamThreadLeaveConsumerGroup.

In order not to lose the trace of the whole exception, passing Exception e as a second argument, while message is pre-formatted and passed as string as a first argument. With this, we won't loose the stack trace of the exception.

Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
2023-10-11 16:14:25 -07:00
Levani Kokhreidze 5dd155f350
KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
With https://issues.apache.org/jira/browse/KAFKA-10575 StateRestoreListener#onRestoreSuspended was added. But local tests show that it is never called because DelegatingStateRestoreListener was not updated to call a new method

Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>, Bruno Cadonna <cadonna@confluent.io>
2023-10-11 16:04:34 -07:00
Christo Lolov a0e3d01fef
KAFKA-14133: Move MeteredTimestampedKeyValueStoreTest, ReadOnlyWindowStoreFacadeTest and TimestampedWindowStoreBuilderTest to Mockito (#14412)
Reviewers: Divij Vaidya <diviv@amazon.com>, Yash Mayya <yash.mayya@gmail.com>
2023-10-11 11:12:31 +02:00
Bruno Cadonna c32d2338a7
KAFKA-10199: Enable state updater by default (#13927)
Now that the implementation for the state updater is done, we can enable it by default.

This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-10-04 13:58:44 +02:00
Lucas Brutschy 6263197a62
KAFKA-15326: [9/N] Start and stop executors and cornercases (#14281)
* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager

Reviewers: Bruno Cadonna <bruno@confluent.io>
2023-10-02 15:41:21 +02:00
Lucas Brutschy b58864e476
MINOR: Logging fix in StreamsPartitionAssignor (#14435)
Fix broken log message

Reviewer: A. Sophie Blee-Goldman <ableegoldman@apache.org>
2023-10-02 12:30:49 +02:00
Hao Li e71f6ebc81
MINOR: only log error when rack aware assignment is enabled (#14415)
Reviewers:  Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-09-29 10:16:29 -07:00
Bruno Cadonna 673a25acc3
KAFKA-10199: Do not unlock state directories of tasks in state updater (#14442)
When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2023-09-27 17:51:30 +02:00
Lucas Brutschy 079e5d647c
KAFKA-15326: [8/N] Move consumer interaction out of processing methods (#14226)
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:

* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer

We introduce updateLags() and 
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.

Reviewer: Bruno Cadonna <bruno@confluent.io>
2023-09-26 18:17:23 +02:00
Bruno Cadonna 65efb98134
KAFKA-10199: Do not process when in PARTITIONS_REVOKED (#14265)
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.

With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.

This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2023-09-26 15:25:30 +02:00
Lucas Brutschy 2d04370bca
KAFKA-10199: Fix restoration behavior for paused tasks (#14437)
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.

Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-09-26 14:05:55 +02:00
Bruno Cadonna a46da90b8f
KAFKA-10199: Add missing catch for lock exception (#14403)
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.

In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2023-09-26 10:58:37 +02:00
Lucas Brutschy 9c2e5daf60
MINOR: Revert log level changes in LogCaptureAppender (#14436)
LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-09-26 10:49:41 +02:00
Ismael Juma 98febb989a
KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427)
This is one of the steps required for kafka to compile with Java 21.

For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.

In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.

See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
2023-09-24 05:59:29 -07:00
Christo Lolov 5bdea94c05
KAFKA-14133: Move MeteredSessionStoreTest, MeteredWindowStoreTest and ReadOnlyKeyValueStoreFacadeTest to Mockito (#14404)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-09-22 11:27:11 +02:00
Divij Vaidya 9e5ca8416d
MINOR: Fix kafka-site formatting (#14419)
Reviewers: Satish Duggana <satishd@apache.org>, Josep Prat <jlprat@apache.org>
2023-09-21 11:28:31 +02:00
Wuzhengyu97 fcd382138e
MINOR: Used Admin instead of AdminClient to create Admin (#14411)
Used Admin instead of AdminClient to create Admin

Reviewers: Ziming Deng <dengziming1993@gmail.com>
2023-09-21 11:01:08 +08:00
Christo Lolov 58da419035
KAFKA-14133: Move KeyValueIteratorFacadeTest, KeyValueSegmentTest and MeteredKeyValueStoreTest to Mockito (#14396)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-09-19 11:24:04 +02:00
Nick Telford f041efa5fd
KAFKA-13973: Fix inflated block cache metrics (#14317)
All block cache metrics are being multiplied by the total number of
column families. In a `RocksDBTimestampedStore`, we have 2 column
families (the default, and the timestamped values), which causes all
block cache metrics in these stores to become doubled.

The cause is that our metrics recorder uses `getAggregatedLongProperty`
to fetch block cache metrics. `getAggregatedLongProperty` queries the
property on each column family in the database, and sums the results.

Since we always configure all column families to share the same block
cache, that causes the same block cache to be queried multiple times for
its metrics, with the results added togehter, effectively multiplying
the real value by the total number of column families.

To fix this, we should simply use `getLongProperty`, which queries a
single column family (the default one). Since all column families share
the same block cache, querying just one of them will give us the correct
metrics for that shared block cache.

Note: the same block cache is shared among all column families of a store
irrespective of whether the user has configured a shared block cache
across multiple stores.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2023-09-18 11:09:10 +02:00
Lucas Brutschy 07a18478be
KAFKA-15326: [7/N] Processing thread non-busy waiting (#14180)
Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:

- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.

So in summary, we

- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.


Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2023-09-11 09:58:20 +02:00
Lucas Brutschy eb39c95080
MINOR: StoreChangelogReaderTest fails with log-level DEBUG (#14300)
A mocked method is executed unexpectedly when we enable DEBUG
log level, leading to confusing test failures during debugging.
Since the log message itself seems useful, we adapt the test
to take the additional mocked method call into account).

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-09-06 14:49:48 +02:00
Yash Mayya d34d84dbef
KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito (#14152)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-09-04 13:54:18 +02:00
Christo Lolov 7a516b0386
KAFKA-14133: Move AbstractStreamTest and RocksDBMetricsRecordingTriggerTest to Mockito (#14223)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-09-04 12:58:50 +02:00
Rohan cc53889aaa
KAFKA-15429: reset transactionInFlight on StreamsProducer close (#14326)
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2023-09-02 18:14:14 -07:00
Rohan d293cd0735
KAFKA-15429: catch+log errors from unsubscribe in streamthread shutdown (#14325)
Preliminary fix for KAFKA-15429 which updates StreamThread.completeShutdown to
catch-and-log errors from consumer.unsubscribe. Though this does not prevent
the exception, it does preserve the original exception that caused the stream
thread to exit.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2023-09-02 18:13:16 -07:00
Lucas Brutschy 16dc983ad6
Kafka Streams Threading: Timeout behavior (#14171)
Implement setting and clearing task timeouts, as well as changing the output on exceptions to make
it similar to the existing code path. 

Reviewer: Walker Carlson <wcarlson@apache.org>
2023-08-31 15:21:01 -05:00
A. Sophie Blee-Goldman 95e1cdc4ef
HOTFIX: avoid placement of unnecessary transient standby tasks & improve assignor logging (#14149)
Minor fix to avoid creating unnecessary standby tasks, especially when these may be surprising or unexpected as in the case of an application with num.standby.replicas = 0 and warmup replicas disabled.

The "bug" here was introduced during the fix for an issue with cooperative rebalancing and in-memory stores. The fundamental problem is that in-memory stores cannot be unassigned from a consumer for any period, however temporary, without being closed and losing all the accumulated state. This caused some grief when the new HA task assignor would assign an active task to a node based on the readiness of the standby version of that task, but would have to remove the active task from the initial assignment so it could first be revoked from its previous owner, as per the cooperative rebalancing protocol. This temporary gap in any version of that task among the consumer's assignment for that one intermediate rebalance would end up causing the consumer to lose all state for it, in the case of in-memory stores.

To fix this, we simply began to place standby tasks on the intended recipient of an active task awaiting revocation by another consumer. However, the fix was a bit of an overreach, as we assigned these temporary standby tasks in all cases, regardless of whether there had previously been a standby version of that task. We can narrow this down without sacrificing any of the intended functionality by only assigning this kind of standby task where the consumer had previously owned some version of it that would otherwise potentially be lost.

Also breaks up some of the long log lines in the StreamsPartitionAssignor and expands the summary info while moving it all to the front of the line (following reports of missing info due to truncation of long log lines in larger applications)
2023-08-30 13:29:38 -07:00
Christo Lolov dbda60c60d
KAFKA-14133: Move RocksDBRangeIteratorTest, TimestampedKeyValueStoreBuilderTest and TimestampedSegmentTest to Mockito (#14222)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-30 11:19:22 +02:00
Taher Ghaleb 3b02e97b65
KAFKA-15403: Refactor @Test(expected) annotation with assertThrows (#14264)
assertThrows makes the verification of exceptions clearer and more intuitive, thus improving code readability compared to the annotation approach. It is considered a test smell in the research literature. One possible research is due to developers not keeping up to date with recent versions of testing frameworks.

All such patterns in streams have been refactored.

Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
2023-08-29 09:27:20 -07:00
Christo Lolov 664f71b207
KAFKA-14133: Move RecordCollectorTest, StateRestoreCallbackAdapterTest and StoreToProcessorContextAdapterTest to Mockito (#14210)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-28 12:06:37 +02:00
Satish Duggana 9e3b1f9b9b
MINOR Bump trunk to 3.7.0-SNAPSHOT (#14286)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-25 05:03:38 +05:30
Phuc-Hong-Tran 8d12c1175c
KAFKA-15152: Fix incorrect format specifiers when formatting string (#14026)
Reviewers: Divij Vaidya <diviv@amazon.com>

Co-authored-by: phuchong.tran <phuchong.tran@servicenow.com>
2023-08-24 19:38:45 +02:00
Christo Lolov 86afa416d2
KAFKA-14133: Move mocks from KStreamTransformValuesTest, KTableImplTest and KTableTransformValuesTest to Mockito (#14204)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-22 10:55:54 +02:00
Walker Carlson ad76497b12
KAFKA-14936: fix grace period partition issue (#14269)
Move the store creation to builder pattern and recover mintimestamp

Reviewers: John Roesler<vvcephei@apache.org>, Bill Bejeck <bbejeck@gmail.com>
2023-08-21 16:08:38 -05:00
Bruno Cadonna 05c329e61f
KAFKA-10199: Change to RUNNING if no pending task to init exist (#14249)
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle and to init.

Usually all pending tasks to init are added to the state updater
in the same poll iteration that handles the assignment. However,
if during an initialization of a task a LockException the task
is re-added to the tasks to init and initialization is retried
in the next poll iteration.

A LockException might occur when a state directory is still locked
by another thread, when the rebalance just happened.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-08-19 19:00:23 +02:00
Bruno Cadonna 4c7e0a9fa6
MINOR: Decouple purging committed records from committing (#14227)
Currently, Kafka Streams only tries to purge records whose
offset are committed from a repartition topic when at
least one offset was committed in the current commit.
The coupling between committing some offsets and purging
records is not needed and might delay purging of records.
For example, if a in-flight call for purging records has not
completed yet when a commit happens, a new call
is not issued.
If then the earlier in-flight call for purging records
finally completes but the next commit does not commit any
offsets, Streams does not issue the call for purging records
whose offset were committed in the previous commit
because the purging call was still in-flight.

This change issues calls for purging records during any commit
if the purge interval passed, even if no offsets were committed
in the current commit.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-08-19 12:13:30 +02:00
Walker Carlson d0b7677c2c
KAFKA-14936: Add restore logic (3/N) (#14027)
Added restore logic for the buffer in grace period joins.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
2023-08-18 22:00:04 -05:00
Matthias J. Sax b36cf4ef97
HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5 (#14103)
KIP-904 introduced a backward incompatible change that requires a 2-bounce rolling upgrade.
The new "3.4" upgrade config value is not recognized by `AssignorConfiguration` though and thus crashed Kafka Streams if use.

Reviewers: Farooq Qaiser <fqaiser94@gmail.com>, Bruno Cadonna <bruno@confluent.io>
2023-08-18 11:06:08 -07:00
Lucas Brutschy ee036ed9ef
KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 (#14216)
Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12

Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
2023-08-18 18:31:27 +02:00
Lucas Brutschy d85a700813
MINOR: Do not reuse admin client across tests (#14225)
Reusing an admin client across tests can cause false positives in leak checkers, so don't do it.

Reviewers: Divij Vaidya <diviv@amazon.com>, Matthias J. Sax <matthias@confluent.io>
2023-08-17 10:53:58 -07:00
Christo Lolov d0e9e94629
KAFKA-14133: Migrate ActiveTaskCreatorTest, ChangelogTopicsTest and GlobalProcessorContextImplTest to Mockito (#14209)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-16 10:19:35 +02:00
bachmanity1 cfe49d1b77
KAFKA-7438: Replace EasyMock with Mockito in SessionStoreBuilderTest (#14142)
Reviewers: Divij Vaidya <diviv@amazon.com>, Yash Mayya <yash.mayya@gmail.com>
2023-08-16 10:01:49 +02:00
Christo Lolov 1a15cd708a
KAFKA-14133: Migrato SessionCacheFlushListenerTest, TimestampedCacheFlushListenerTest and TimestampedTupleForwarderTest to Mockito (#14205)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-16 09:46:40 +02:00
bachmanity1 fd6c9f16ba
KAFKA-7438: Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGaugesTest (#14190)
Reviewers: Christo Lolov <christololov@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-08-15 11:48:13 +02:00
Lucas Brutschy 5234ddff50
KAFKA-15326: [5/N] Processing thread punctuation (#14001)
Implements punctuation inside processing threads. The scheduler
algorithm checks if a task that is not assigned currently can
be punctuated, and returns it when a worker thread asks for the
next task to be processed. Then, the processing thread runs all
punctuations in the punctionation queue.

Piggy-backed: take TaskExecutionMetadata into account when
processing records.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-08-14 17:17:28 +02:00
bachmanity1 ae46c0a34c
KAFKA-7438: Replace Easymock & Powermock with Mockito in TableSourceNodeTest (#14189)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-12 10:41:07 +02:00
Hao Li 3a94670a01
MINOR: Fix streams task assignor tests (#14196)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-08-12 10:38:07 +02:00
Florin Akermann 1e747a24a3
KAFKA-13197: fix GlobalKTable join/left-join semantics documentation. (#14187)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-11 11:45:18 -07:00
Hao Li 4268e502ec
KAFKA-15022: [10/N] docs for rack aware assignor (#14181)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-11 11:40:33 -07:00
Hao Li 96631c25d5
KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor (#14178)
Part of KIP-925.

Use rack aware assignor in StickyTaskAssignor.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-09 19:50:43 -07:00
Hao Li 87308167b1
KAFKA-15022: [8/N] more tests for HAAssignor (#14164)
Part of KIP-925.

- Add more tests for HighAvailabilityTaskAssignor
- Remove null and optional check for RackAwareTaskAssignor
- Pass rack aware assignor configs to getMainConsumerConfigs so that they can be picked up in rebalance protocol
- Change STATELESS_NON_OVERLAP_COST to 0. It was a mistake to be 1. Stateless tasks should be moved without this cost.
- Update of existing tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-09 16:42:53 -07:00
Lucia Cerchie a1cb4b4025
add changes made before merge (#14137)
Change in response to KIP-941.

New PR due to merge issue.

Changes line 57 in the RangeQuery class file from:

public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
    return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
}
to

public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
     return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper));
 }
Testing strategy:

Since null values can now be entered in RangeQuerys in order to receive full scans, I changed the logic defining query starting at line 1085 in IQv2StoreIntegrationTest.java from:

        final RangeQuery<Integer, V> query;
        if (lower.isPresent() && upper.isPresent()) {
            query = RangeQuery.withRange(lower.get(), upper.get());
        } else if (lower.isPresent()) {
            query = RangeQuery.withLowerBound(lower.get());
        } else if (upper.isPresent()) {
            query = RangeQuery.withUpperBound(upper.get());
        } else {
            query = RangeQuery.withNoBounds();
        }
to

query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
because different combinations of isPresent() in the bounds is no longer necessary.

Reviewers: John Roesler <vvcephei@apache.org>, Bill Bejeck <bbejeck@apache.org>
2023-08-08 15:03:42 -04:00
Hao Li 60a5117001
KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor (#14139)
Part of KIP-915.

- Change TaskAssignor interface to take RackAwareTaskAssignor
- Integrate RackAwareTaskAssignor to StreamsPartitionAssignor and HighAvailabilityTaskAssignor
- Update HAAssignor tests

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <matthias@confluent.io>
2023-08-08 08:01:05 -07:00
Matthias J. Sax 1c04ae8e61
MINOR: Improve JavaDocs of KafkaStreams `context.commit()` (#14163)
Reviewers: Bill Bejeck <bill@confluent.io>
2023-08-08 07:51:59 -07:00
Hao Li 8dec3e6616
KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer (#14150)
Part of KIP-925.

- Add configs for rack aware assignor
- Update standby optimizer in RackAwareTaskAssignor to have more rounds
- Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-07 11:21:55 -07:00
Matthias J. Sax faf36357f3
MINOR: improve logging for FK-join (#14105)
Reviewers: Colt McNealy <colt@littlehorse.io>, Walker Carlson <wcarlson@confluent.io>
2023-08-04 21:06:53 -07:00
Bruno Cadonna 7782741262
KAFKA-10199: Change to RUNNING if no pending task to recycle exist (#14145)
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle.

There are situations in which a stream thread might only have
standby tasks that are recycled to active task after a rebalance.
In such situations, the stream thread might be faster in checking
active tasks in restoration then the state updater removing the
standby task to recycle from the state updater. If that happens
the stream thread changes to RUNNING although it should wait until
the standby tasks are recycled to active tasks and restored.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-08-04 09:07:58 +02:00
Hao Li bb48b157af
KAFKA-15022: [5/N] compute rack aware assignment for standby tasks (#14108)
Part of KIP-925.

Reviewer: Matthias J. Sax <matthias@confluent.io>
2023-08-02 19:20:23 -07:00
Hao Li 0ce16406e0
KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment (#14097)
Part of KIP-925.

For rack aware standby task assignment, we can either use the already existing "rack tags" or as a fall-back the newly added "rack.id". This PR unifies both without the need to change the actual standby task assignment logic.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-08-01 17:33:24 -07:00
Hao Li ed44bcd71b
KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (#14030)
Part of KIP-925.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-07-26 16:02:52 -07:00
Hao Li 6bb88ae2f7
KAFKA-15022: [2/N] introduce graph to compute min cost (#13996)
Part of KIP-925.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-07-20 16:17:47 -07:00
Christo Lolov 8f313eaed4
KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-07-20 18:16:18 +02:00
Walker Carlson e9fe2a2eea
KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N) (#13942)
Check the history retention of the ktable of the grace period join.

Reviewers: Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2023-07-20 16:01:21 +02:00
Federico Valeri 334c41d604
KAFKA-14734: Use CommandDefaultOptions in StreamsResetter (#13983)
This PR adds CommandDefaultOptions usage like in the other joptsimple based tools. It also moves the associated unit test class from streams to tools module as discussed in #13127 (comment)

Reviewers:  Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
2023-07-20 18:45:05 +08:00
Lucas Brutschy 5f20750dc1
Kafka Streams Threading: Exception handling (#13957)
Catch any exceptions that escape the processing logic
inside TaskExecutors and record them in the TaskManager.
Make sure the TaskExecutor survives, but the task is
unassigned. Add a method to TaskManager to drain the
exceptions. The aim here is that the polling thread will
drain the exceptions to be able to execute the
uncaught exception handler, abort transactions, etc.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-07-13 14:33:39 +02:00
ezio 170f5f4ed0
KAFKA-15148: Mark tests correctly as integration tests where they running as unit tests (#13973)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-07-12 13:41:58 +02:00
Hao Li 0e56cc8841
KAFKA-15022: [1/N] initial implementation of rack aware assignor (#13851)
Part of KIP-925. Adds first internal classes to track rack.id client/partition metadata.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-07-10 08:41:20 -07:00
Aneel Kumar fd5b300b57
MINOR: Fix typo in javadoc (#13972)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-07-07 16:50:35 +02:00
Bruno Cadonna 5c2492bca7
KAFKA-10199: Consider tasks in state updater when computing offset sums (#13925)
With the state updater, the task manager needs also to look into the
tasks owned by the state updater when computing the sum of offsets
of the state. This sum of offsets is used by the high availability
assignor to assign warm-up replicas.
If the task manager does not take into account tasks in the
state updater, a warm-up replica will never report back that
the state for the corresponding task has caught up. Consequently,
the warm-up replica will never be dismissed and probing rebalances
will never end.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-07-03 16:35:34 +02:00
Ismael Juma 1f4cbc5d53
MINOR: Add JDK 20 CI build and remove some branch builds (#12948)
It's good for us to add support for Java 20 in preparation for Java 21 - the next LTS.

Given that Scala 2.12 support has been deprecated, a Scala 2.12 variant is not included.

Also remove some branch builds that add load to the CI, but have
low value: JDK 8 & Scala 2.13 (JDK 8 support has been deprecated),
JDK 11 & Scala 2.12 (Scala 2.12 support has been deprecated) and
JDK 17 & Scala 2.12 (Scala 2.12 support has been deprecated).

A newer version of Mockito (4.9.0 -> 4.11.0) is required for Java 20 support, but we
only use it with Scala 2.13+ since it causes compilation errors with Scala 2.12. Similarly,
we upgrade easymock when the Java version is 16 or newer as it's incompatible
with powermock (which doesn't support Java 16 or newer).

Filed KAFKA-15117 for a test that fails with Java 20 (SslTransportLayerTest.testValidEndpointIdentificationCN).

Finally, fixed some lossy conversions that were added after #13582 was submitted.

Reviewers: Ismael Juma <ismael@juma.me.uk>
2023-06-30 01:12:00 -07:00
Walker Carlson 12be344fdd
KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) (#13855)
This PR adds the interface for grace period to the Joined object as well as uses the buffer. The majority of it is tests and moving some of the existing join logic.

Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2023-06-29 14:14:04 +02:00
Bo Gao 005416879e
KAFKA-15053: Use case insensitive validator for security.protocol config (#13831)
Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)

Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
2023-06-29 10:13:21 +02:00
Ismael Juma 9c8aaa2c35
MINOR: Fix lossy conversions flagged by Java 20 (#13582)
An example of the warning:
> warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy

There should be no change in behavior as part of these changes - runtime logic ensured
we didn't run into issues due to the lossy conversions.

Reviewers: Divij Vaidya <diviv@amazon.com>
2023-06-22 08:05:55 -07:00
minjian.cai 474053d297
MINOR: fix typos for streams (#13888)
Reviewers: Divij Vaidya <diviv@amazon.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>
2023-06-20 23:03:42 +02:00
Shekhar Rajak 0e8c436c7d
KAFKA-7438: Migrate to Mockito in TimeOrderedCachingPersistentWindowStoreTest (#12739)
Replaces EasyMock and PowerMock with Mockito in TimeOrderedCachingPersistentWindowStoreTest.

Reviewers: Divij Vaidya <diviv@amazon.com>, Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
2023-06-20 13:51:56 +02:00
Iblis Lin 41a8e55634
MINOR: fix doc typo of StreamsBuilder.addGlobalStore (#13871)
Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>
2023-06-19 10:33:44 +02:00
Christo Lolov c5df47a1cb
KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito (#13711)
Reviewers: Bruno Cadonna <cadonna@apache.org>
2023-06-15 14:55:55 +02:00
Walker Carlson 4a5d1b3205
KAFKA-14936: Add On Disk Time Ordered Buffer (1/N) (#13756)
KAFKA-14936: Add On Disk Time Ordered Buffer

Add a time ordered key-value buffer stored on disk and implemented using RocksDBTimeOrderedKeyValueSegmentedBytesStore.

This will be used in the stream buffered for joins with a grace period.

Reviewers: Bruno Cadonna <cadonna@confluent.io> Victoria Xia <victoria.xia@confluent.io>
2023-06-14 15:16:55 -05:00
Manyanda Chitimbo 044d058e03
MINOR: remove unused field ProcessorNode#time (#13624)
Reviewers: Divij Vaidya <diviv@amazon.com>
2023-06-14 15:30:57 +02:00
Christo Lolov 7f0e45590a
KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito (#13712)
This pull requests migrates the Admin mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.

Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Bruno Cadonna <cadonna@apache.org>
2023-06-13 10:48:07 +02:00
Bruno Cadonna 6fe74f78dc
KAFKA-10199: Re-add revived tasks to the state updater after handling (#13829)
Fixes a bug regarding the state updater where tasks that experience corruption
during restoration are passed from the state updater to the stream thread
for closing and reviving but then the revived tasks are not re-added to
the state updater.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-06-11 15:14:46 +02:00
Walker Carlson daba741826
KAFKA-14936: Change Time Ordered Buffer to not require Change<> 0/N (#13830)
Change the TimeOrderedKeyValueBuffer to take three types to include the store type so that it can be used for non change<V> operations as well.

Reviewers: Victoria Xia<victoria.xia@confluent.io> , Gabriel Gama <>
2023-06-10 17:22:32 -05:00
David Jacot 7eea2a3908
MINOR: Move MockTime to server-common (#13823)
This patch rewrite `MockTime` in Java and moves it to `server-common` module. This is a prerequisite to move `MockTimer` later on to `server-common` as well. 

Reviewers: David Arthur <mumrah@gmail.com>
2023-06-09 08:54:25 +02:00
Danica Fine 513e1c641d
KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map (#13751)
Replace usage of Cluster in StreamsMetadataState with Map<String, List>. Update StreamsPartitionAssignor#onAssignment method to pass existing Map<TopicPartition, PartitionInfo> instead of fake Cluster object.

Behavior remains the same; updated existing unit tests accordingly.

Reviewers:  Walker Carlson <wcarlson@confluent.io>, Bill Bejeck <bbejeck@apache.org>
2023-06-07 15:35:11 -04:00
Milind Mantri 4b46bb4904
KAFKA-12562: Fix KafkaStreams#store old references in comments (#13774)
Following method was deprecated in 2.5 and was removed in 3.0.0.

// KafkaStreams.java
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);

However, many comments reference the removed method which can be confusing in generated JavaDocs. The code in java doc comments has been changed to reflect the new method, store(final StoreQueryParameters<T> storeQueryParameters).

Also, minor changes to variable names in java doc to be context specific.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-06-01 09:16:00 +02:00
Mehari Beyene 560ab2cc31
KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito (#13681)
This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest from EasyMock to Mockito
The change is restricted to a single mock to minimize the scope and make it easier for review.
Please see two examples that follow the same pattern below:
#13529
#13621

Reviewers: Divij Vaidya <diviv@amazon.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Christo Lolov <lolovc@amazon.com>, Bruno Cadonna <cadonna@apache.org>
2023-06-01 08:47:31 +02:00
Matthias J. Sax e96a463561
KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734)
Reviewers: Walker Carlson <<wcarlson@confluent.io>
2023-05-21 14:39:12 -07:00
A. Sophie Blee-Goldman 6d2ad4a383
HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API (#13695)
The VersionedKeyValueToBytesStoreAdapter#isOpen API accidentally returns the value of inner.persistent() when it should be returning inner.isOpen()

Reviewers: Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Victoria Xia <victoria.xia@confluent.io>
2023-05-10 13:39:15 -07:00
Matthias J. Sax b40a7fc037
HOTFIX: fix broken Streams upgrade system test (#13654)
Reviewers: Victoria Xia <victoria.xia@confluent.io>, John Roesler <john@confluent.io>
2023-05-08 14:24:11 -07:00
Christo Lolov 2b98f8553b
KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito (#13621)
Migrates ChangeLogReader mock in TaskManagerTest to mockito.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-05-08 16:24:52 +02:00
Bruno Cadonna 141c76a2c9
KAFKA-14133: Migrate topology builder mock in TaskManagerTest to mockito (#13529)
1. Migrates topology builder mock in TaskManagerTest to mockito.

2. Replaces the unit test to verify if subscribed partitions are added
to topology metadata.

3. Modifies signatures of methods for adding subscribed partitions to
topology metadata to use sets instead of lists. This makes the
intent of the methods clearer and makes the tests more portable.

Reviewers: Christo Lolov <lolovc@amazon.com>, Matthias J. Sax <mjsax@apache.org>
2023-05-02 14:00:34 +02:00
LinShunKang dd6690a7a0
KAFKA-14944: Reduce CompletedFetch#parseRecord() memory copy (#12545)
This implements KIP-863: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
Direct use ByteBuffer instead of byte[] to deserialize.

Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>
2023-04-27 10:44:08 +08:00
Matthias J. Sax 2557a4b842
KAFKA-12446: update change encoding to use varint (#13533)
KIP-904 had the goal in mind to save space when encoding the size on a byte array. However, using UINT32 does not achieve this goal. This PR changes the encoding to VARINT instead.

Reviewers: Victoria Xia <victoria.xia@confluent.io>,  Farooq Qaiser <fqaiser94@gmail.com>, Walker Carlson <wcarlson@confluent.io>
2023-04-24 15:29:57 -07:00
Victoria Xia ab8f285097
KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores (#13615)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-24 14:06:26 -07:00
Matthias J. Sax 6dcdb01732
KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
Stream-stream outer join, uses a "shared time tracker" to track stream-time progress for left and right input in a single place. This time tracker is incorrectly shared across tasks.

This PR introduces a supplier to create a "shared time tracker" object per task, to be shared between the left and right join processors.

Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-04-24 12:40:25 -07:00
Victoria Xia 11c8bf4826
KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest` (#13609)
This PR fixes a bug in the table-table join handling of out-of-order records in versioned tables where if the latest value for a particular key is a tombstone, by using the isLatest value from the Change object instead of calling get(key) on the state store to fetch timestamps to compare against. As part of this fix, this PR also updates table-table joins to determine whether upstream tables are versioned by using the GraphNode mechanism, instead of checking the table's value getter.

Part of KIP-914.

Reviewer: Matthias J. Sax <matthias@confluent.io>
2023-04-19 16:34:36 -07:00
Matthias J. Sax 3388adf1b5
MINOR: rename internal FK-join processor classes (#13589)
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
2023-04-18 11:32:27 -07:00
Matthias J. Sax bf3a5a3e04
MINOR: remove unnecessary `KeyValueMapper` (#13545)
Reviewers: Christo Lolov (@clolov), Bill Bejeck <bill@confluent.io>
2023-04-14 14:37:40 -07:00
Philip Nee fc9df51be5
MINOR: improve ProductionExceptionHandler test (#13576)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-14 13:03:45 -07:00
Matthias J. Sax c958d8719d
Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
This reverts commit d9b139220e.

KIP-878 implementation did not make any progress, so we need to revert
the public API changes which are not functional right now.

Reviewers: Bill Bejeck <bill@confluent.io>
2023-04-14 12:08:49 -07:00
Matthias J. Sax 20028e24cc
KAFKA-14054: Handle TimeoutException gracefully (#13534)
We incorrectly assumed, that `consumer.position()` should always be
served by the consumer locally set position.

However, within `commitNeeded()` we check if first `if(commitNeeded)`
and thus go into the else only if we have not processed data (otherwise,
`commitNeeded` would be true). For this reason, we actually don't know
if the consumer has a valid position or not.

We should just swallow a timeout if the consumer cannot get the position
from the broker, and try the next partition. If any position advances, we
can return true, and if we timeout for all partitions we can return
false.

Reviewers: Michal Cabak (@miccab), John Roesler <john@confluent.io>, Guozhang Wang <guozhand@confluent.io>
2023-04-14 09:43:53 -07:00
Matthias J. Sax 5767d12963
MINOR: Refactor changelogger to accept timestamp (#13563)
Reviewers: Bill Bejeck <bill@confluent.io>
2023-04-14 07:14:10 -07:00
Mickael Maison dc1ede8d89
MINOR: Bump trunk to 3.6.0-SNAPSHOT (#13570)
Reviewers: David Jacot <djacot@confluent.io>
2023-04-14 14:17:07 +02:00
Matthias J. Sax b1830e4aa2
KAFKA-14834: [9/N] Disable versioned-stores for unsupported operations (#13565)
Using versioned-stores for global-KTables is not allowed, because a
global-table is bootstrapped on startup, and a stream-globalTable join
does not support temporal semantics.

Furthermore, `suppress()` does not support temporal semantics and thus
cannot be applied to an versioned-KTable.

This PR disallows both use-cases explicitely.

Part of KIP-914.

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Victoria Xia <victoria.xia@confluent.io>
2023-04-14 11:43:54 +02:00
Philip Nee b8d8fcdd62
KAFKA-7499: Handle serialization error in ProductionExceptionHandler (#13477)
Implements KIP-399.

Extends ProductionExceptionHandler to handle serialization errors, and to allow users to continue processing and dropping the corresponding record on the floor.

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-04-13 23:36:26 -07:00
Victoria Xia f1eb260fea
KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566)
KIP-914 introduced a new boolean isLatest into Change to indicate whether a change update represents the latest for the key. Even though Change is serialized into the table repartition topic, the new boolean does not need to be serialized in, because the table repartition map processor performs an optimization to drop records for which isLatest = false. If not for this optimization, the downstream table aggregate would have to drop such records instead, and isLatest would need to be serialized into the repartition topic.

In light of the possibility that isLatest may need to be serialized into the repartition topic in the future, e.g., if other downstream processors are added which need to distinguish between records for which isLatest = true vs isLatest = false, this PR reserves repartition topic formats which include isLatest. Reserving these formats now comes at no additional cost to users since a rolling bounce is already required for the upcoming release due to KIP-904. If we don't reserve them now and instead have to add them later, then another bounce would be required at that time. Reserving formats is cheap, so we choose to do it now.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-13 21:56:36 -07:00
Victoria Xia a87edf13d5
KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change` (#13564)
Part of KIP-914.

This PR adds an additional boolean isLatest into Change which specifies whether the new value is the latest for its key. For un-versioned stores, isLatest is always true. For versioned stores, isLatest is true if the value has the latest timestamp seen for the key, else false. This boolean will be used by processors such as the table repartition map processor to determine when a record is out-of-order and should be dropped (when processing a versioned table).  This PR updates the table repartition map processor accordingly, and also adds test coverage for table filter.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-13 21:20:05 -07:00
Victoria Xia 9217c7e151
KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo (#13554)
Part of KIP-914.

This PR updates the return type of VersionedKeyValueStore#put(...) from void to long, where the long is the validTo timestamp of the newly put record, with two special values to indicate either that no such timestamp exists (because the record is the latest for its key) or that the put did not take place (because grace period has elapsed). 

As part of making this change, VersionedBytesStore introduces its own put(key, value, timestamp) method to avoid method signature conflicts with the existing put(key, value) method from KeyValueStore<Bytes, byte[]> which has void return type. As a result, the previously added NullableValueAndTimestampSerde class is no longer needed so it's also been removed in this PR as cleanup.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-13 20:19:42 -07:00
Victoria Xia 7d580dc7a2
KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes (#13552)
This PR adds a method into GraphNode to assist in tracking whether tables are materialized as versioned or unversioned stores. This is needed in order to allow processors which have different behavior on versioned vs unversioned tables to use the correct semantics. Part of KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-13 14:21:28 -07:00
Victoria Xia 1d5d003ff4
KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522)
This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-12 19:05:10 -07:00
Victoria Xia 1395ad6497
KAFKA-14834: [4/N] Drop out-of-order records from table-table join with versioned tables (#13510)
This PR updates primary-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-12 17:06:28 -07:00
Victoria Xia 88e2d6b8c2
KAFKA-14834: [3/N] Timestamped lookups for stream-table joins (#13509)
This PR updates the stream-table join processors, including both KStream-KTable and KStream-GlobalKTable joins, to perform timestamped lookups when the (global) table is versioned, as specified in KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-12 16:54:15 -07:00
Guozhang Wang 750a389308
MINOR: Follow-up on failing streams test, and fix StoreChangelogReader (#13523)
1. I've verified and made sure the only case that task would be null and not stream task would be in testing code only, with pausing / resuming topologies; I've revamped the restoration recording func, mainly to make just one method on the Task interface, to make sure we would never get task == null and do not need to cast to StreamTask.
2. Use numRecords directly to avoid calling records.size() that triggers concurrent modifications.
3. Rewrite the TaskMetricsTest to not use the removed impl functions.
4. Found an issue while fixing 1) above, turns out it's related to pausing tasks: if the tasks are paused due to instance / named-topologies are paused while they need restoration, the restoration would never finish, and hence the instance's state would not transit to RUNNING. Similarly, if user paused just one of the named-topology right at the beginning, since the state would not transit to RUNNING, every tasks across all named-topologies would not make progress. We keep the behavior as is to be consistent with and without state-updater.

Reviewers: Matthias J. Sax <mjsax@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
2023-04-12 14:57:02 -07:00
Victoria Xia 7c74f3983b
KAFKA-14491: [21/N] Docs updates for versioned state stores (#13444)
Add docs for KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-12 11:31:27 -07:00
Victoria Xia 17b4569d70
KAFKA-14834: [2/N] Test coverage for out-of-order data in joins (#13497)
In preparation for updating DSL join processors to have updated semantics when versioned stores are used (cf KIP-914), this PR adds test coverage for out-of-order data in joins to the existing integration tests for stream-table joins and primary-key table-table joins. Follow-up PRs will build on top of this change by adding new tests for versioned stores, and the out-of-order data will produce different results in those settings.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-11 20:42:55 -07:00
Victoria Xia cb7d0833ee
KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter (#13496)
In preparation for updating DSL processors to use versioned stores (cf KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and get(key, asOfTimestamp) and updates all existing implementations accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-11 20:40:11 -07:00
Farooq Qaiser 396536bb5a
KAFKA-12446: Call subtractor before adder if key is the same (#10747)
Implements KIP-904.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-10 13:27:04 -07:00
Victoria Xia 17435484e4
KAFKA-14491: [22/N] Add test for manual upgrade to versioned store (#13449)
Adds an integration test for the manual upgrade scenario to upgrade a non-versioned store to a versioned store. The procedure is outlined in KIP-889 and also in the docs.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-07 11:15:17 -07:00
Philip Nee ef453dd1ad
KAFKA-12634 enforce checkpoint after restoration (#13269)
Under at-least-once, we want to ensure checkpointing the progress after completing the restoration to prevent losing the progress and needing to restore from scratch.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
2023-04-07 11:18:40 +02:00
Luke Chen f02f5f8c8a
MINOR: fix stream failing tests (#13512)
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
2023-04-06 09:00:10 -07:00
Lucas Brutschy 2117c4bce8
Minor: fix ReadOnlyTaskTest (#13519)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-04-06 08:56:07 -07:00
Chia-Ping Tsai 3bbff167fa
MINOR: fix invalid usage in java docs (#13506)
Reviewers: Luke Chen <showuon@gmail.com>
2023-04-06 16:01:14 +08:00
Guozhang Wang b2ee6df1c4
KAFKA-14172: Should clear cache when active recycled from standby (#13369)
This fix is inspired by #12540.

1. Added a clearCache function for CachedStateStore, which would be triggered upon recycling a state manager.
2. Added the integration test inherited from #12540 .
3. Improved some log4j entries.
4. Found and fixed a minor issue with log4j prefix.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-04-05 16:05:11 -07:00
Guozhang Wang 653baa6694
KAFKA-10199: Add task updater metrics, part 2 (#13300)
Part of KIP-869

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-04-05 11:49:08 -07:00
Victoria Xia df59cc1a01
KAFKA-14491: [20/N] Add public-facing methods for versioned stores (#13442)
Until this PR, all the code added for KIP-889 for introducing versioned stores to Kafka Streams has been accessible from internal packages only. This PR exposes the stores via public Stores.java methods, and also updates the TopologyTestDriver.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-05 09:27:53 -07:00
Guozhang Wang beb0be5fe4
KAFKA-14533: Do not interrupt state-updater thread during shutdown (#13318)
1. Fix the StateUpdater shutdown procedure: a) in shutdown, we first set the running flag, then notify the condition; b) in the thread's waitIfAllChangelogsCompletelyRead block, we collapse the if condition together with the while condition so that we always check all four conditions once the thread is notified inside the while loop. As a result, shutdown procedure would not involve any thread interruptions anymore.
2. Print fine-grained streams exception when list-offset fails, this is a byproduct of the debugging procedure but I think it's worth keeping since it has better operational visibilities.
3. Some nit logging improvements (including moving logger from the inner thread into the outer class to also add some more logging).
4. Re-enable state-updater in SmokeTestDriverIntegrationTest.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2023-04-04 15:29:00 -07:00
Victoria Xia babfb1778b
KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close (#13470)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-03 21:29:40 -07:00
Victoria Xia 63fee01366
KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one (#13431)
The RocksDB-based versioned store implementation introduced in KIP-889 currently uses two physical RocksDB instances per store instance: one for the "latest value store" and another for the "segments store." This PR combines those two RocksDB instances into one by representing the latest value store as a special "reserved" segment within the segments store. This reserved segment has segment ID -1, is never expired, and is not included in the regular Segments methods for getting or creating segments, but is represented in the physical RocksDB instance the same way as any other segment.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-03 21:27:19 -07:00
Victoria Xia f503aa3ab4
KAFKA-14491: [16/N] Add recovery logic for store inconsistency due to failed write (#13364)
The RocksDB-based implementation of versioned stores introduced via KIP-889 consists of a "latest value store" and separate (logical) "segments stores." A single put operation may need to modify multiple (two) segments, or both a segment and the latest value store, which opens the possibility to store inconsistencies if the first write succeeds while the later one fails. When this happens, Streams will error out, but the store still needs to be able to recover upon restart. This PR adds the necessary repair logic into RocksDBVersionedStore to effectively undo the earlier failed write when a store inconsistency is encountered.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-04-03 21:23:48 -07:00
vamossagar12 c14f56b484
KAFKA-14586: Moving StreamResetter to tools (#13127)
Moves StreamResetter to tools project.

Reviewers: Federico Valeri <fedevaleri@gmail.com>, Christo Lolov <lolovc@amazon.com>, Bruno Cadonna <cadonna@apache.org>
2023-03-28 14:43:22 +02:00
Spacrocket 71ca8ef4ec
KAFKA-14722: Make BooleanSerde public (#13382)
KAFKA-14722: Make BooleanSerde public (#13328)

Addition of boolean serde
https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface

During the task KAFKA-14491 Victoria added BooleanSerde class, It will be useful to have such class in public package.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Divij Vaidya <diviv@amazon.com>
2023-03-24 10:41:51 -05:00
hudeqi f79c2a6e04
MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest (#13427)
Co-authored-by: Deqi Hu <deqi.hu@shopee.com>

Reviewers: Ziming Deng <dengziming1993@gmail.com>, Guozhang Wang <guozhang.wang.us@gmail.com>
2023-03-23 09:36:35 -07:00
Victoria Xia 45ecae6a28
KAFKA-14491: [15/N] Add integration tests for versioned stores (#13340)
Adds integration tests for the new versioned stores introduced in KIP-889.

This PR also contains a small bugfix for the restore record converter, required to get the tests above to pass: even though versioned stores are timestamped stores, we do not want to use the record converter for prepending timestamps when restoring a versioned store.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-03-22 10:26:06 -07:00
Victoria Xia 1560c5bd7e
KAFKA-14491: [18/N] Update versioned store to check latest value on timestamped get (#13409)
Part of KIP-889.

Prior to this PR, versioned stores always returned null for get(key, timestamp) calls where the timestamp has exceeded the store's history retention, even if the latest value for the key (i.e., the one returned from get(key)) satisfies the timestamp bound. This was an oversight from the earlier implementation -- get(key, timestamp) should still return a record in this situation since the record exists in the store. This PR updates both the javadocs and the implementation accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-03-21 17:12:05 -07:00
Victoria Xia bfd15299b1
KAFKA-14491: [14/N] Set changelog topic configs for versioned stores (#13292)
Sets the correct topic configs for changelog topics for versioned stores introduced in KIP-889. Changelog topics for versioned stores differ from those for non-versioned stores only in that min.compaction.lag.ms needs to be set in order to prevent version history from being compacted prematurely.

The value for min.compaction.lag.ms is equal to the store's history retention plus some buffer to account for the broker's use of wall-clock time in performing compactions. This buffer is analogous to the windowstore.changelog.additional.retention.ms value for window store changelog topic retention time, and uses the same default of 24 hours. In the future, we can propose a KIP to expose a config such as versionedstore.changelog.additional.compaction.lag.ms to allow users to tune this value.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-03-21 17:08:10 -07:00
Victoria Xia 361095a1a7
KAFKA-14491: [17/N] Refactor segments cleanup logic
Part of KIP-899.

AbstractSegments automatically calls the helper method to clean up expired segments as part of getOrCreateSegmentIfLive(). This works fine for windowed store implementations which call getOrCreateSegmentIfLive() exactly once per put() call, but is inefficient and difficult to reason about for the new RocksDBVersionedStore implementation (cf. #13188) which makes potentially multiple calls to getOrCreateSegmentIfLive() for different segments for a single put() call. This PR addresses this by refactoring the call to clean up expired segments out of getOrCreateSegmentIfLive(), opting to have the different segments implementations specify when cleanup should occur instead. After this PR, RocksDBVersionedStore only cleans up expired segments once per call to put().

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-03-20 20:03:50 -07:00
Lucas Brutschy 6fae237638
MINOR: Use JUnit-5 extension to enforce strict stubbing (#13347)
A privious change disabled strict stubbing for the `RocksDBMetricsRecorderTest`. To re-enable the behavior in JUnit-5, we need to pull in a new dependency in the `streams` gradle project.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-03-20 13:49:35 -07:00
Victoria Xia 84351efd51
KAFKA-14491: [13/N] Add versioned store builder and materializer (#13274)
This PR introduces VersionedKeyValueStoreBuilder for building the new versioned stores introduced in KIP-889, analogous to the existing TimestampedKeyValueStoreBuilder for building timestamped stores. This PR also updates the existing KTable store materializer class to materialize versioned stores in addition to timestamped stores. As part of this change, the materializer is renamed from TimestampedKeyValueStoreMaterializer to simply KeyValueStoreMaterializer.

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
2023-03-06 17:13:33 -08:00
Christo Lolov 5b295293c0
MINOR: Remove unnecessary toString(); fix comment references (#13212)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
2023-03-06 18:39:04 +01:00
littlehorse-eng a6d8988179
MINOR: Clarify docs for Streams config max.warmup.replicas. (#13082)
Documentation only—Minor clarification on how max.warmup.replicas works; specifically, that one "warmup replica" corresponds to a Task that is restoring its state. Also clarifies how max.warmup.replicas interacts with probing.rebalance.interval.ms.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
2023-03-03 11:00:51 -08:00
Lucas Brutschy 47450ee064
MINOR: update RocksDBMetricsRecorder test to JUnit5 and fix memory leak (#13336)
The test was leaking memory via Mockito internals. Piggy-backing an update to JUnit5.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-03-03 10:17:08 -08:00
Victoria Xia 517b5d2b09
KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores (#13264)
As part of introducing versioned key-value stores in KIP-889, we want to lift the existing DSL restriction that KTable stores are always TimestampedKeyValueStores to allow for KTable stores which are VersionedKeyValueStores instead. This PR lifts this restriction by replacing raw usages of TimestampedKeyValueStore with a new KeyValueStoreWrapper which supports either TimestampedKeyValueStore or VersionedKeyValueStore.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-03-02 14:14:30 -08:00
Guozhang Wang 5842953249
MINOR: Fix flaky tests in DefaultStateUpdaterTest (#13319)
Found a few flaky tests while reviewing another PR. The root cause seems to be with changing the return behavior of when in mockito. Fixed those without using reset and also bumped a couple debug log lines to info since they could be very helpful in debugging.

Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
2023-03-01 17:36:23 -08:00
Victoria Xia 400ba0aeae
KAFKA-14491: [11/N] Add metered wrapper for versioned stores (#13252)
Introduces the metered store layer for the new versioned key-value store introduced in KIP-889. This outermost, metered store layer handles all serialization/deserialization from VersionedKeyValueStore to a bytes-representation (VersionedBytesStore) so that all inner stores may operate only with bytes types.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-24 14:32:43 -08:00
Guozhang Wang 2fad165294
KAFKA-10199: Add task updater metrics, part 1 (#13228)
* Moved pausing-tasks logic out of the commit-interval loop to be on the top-level loop, similar to resuming tasks.
* Added thread-level restoration metrics.
* Related unit tests.

Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>, Matthias J. Sax <matthias@confluent.io>
2023-02-24 10:25:11 -08:00
Lucia Cerchie 8c84d29c2e
KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)
Kafka Streams is supposed to handle TimeoutException during internal topic creation gracefully. This PR fixes the exception handling code to avoid crashing on an TimeoutException returned by the admin client.

Reviewer: Matthias J. Sax <matthias@confluent.io>, Colin Patrick McCabe <cmccabe@apache.org>, Alexandre Dupriez (@Hangleton)
2023-02-22 22:51:51 -08:00
Victoria Xia a2c9f421af
KAFKA-14491: [10/N] Add changelogging wrapper for versioned stores (#13251)
Introduces the changelogging layer for the new versioned key-value store introduced in KIP-889. The changelogging layer operate on VersionedBytesStore rather than VersionedKeyValueStore so that the outermost metered store can serialize to bytes once and then all inner stores operate only with bytes types.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-21 13:27:54 -08:00
Lucas Brutschy 0fc029c6a4
KAFKA-14299: Fix pause and resume with state updater (#13025)
* Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path.

* Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore
consumer before realizing that a newly added task was already in paused state when being added.

* Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition.

* Make sure that allTasks methods also return the tasks that are currently being restored.

* Enable PauseResumeIntegrationTest and upgrade it to JUnit5.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Guozhang Wang <wangguoz@gmail.com>
2023-02-21 10:17:09 -08:00
Victoria Xia 2e3bbe63c1
KAFKA-14491: [9/N] Add versioned bytes store and supplier (#13250)
As part of introducing versioned key-value stores in KIP-889, we'd like a way to represent a versioned key-value store (VersionedKeyValueStore<Bytes, byte[]>) as a regular key-value store (KeyValueStore<Bytes, byte[]>) in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept Materialized<K, V, KeyValueStore<Bytes, byte[]>. This way, we do not need to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized type to accept versioned stores.

This PR introduces the new VersionedBytesStore extends KeyValueStore<Bytes, byte[]> interface for this purpose, along with the corresponding supplier (VersionedBytesStoreSupplier) and implementation (RocksDbVersionedKeyValueBytesStoreSupplier). The RocksDbVersionedKeyValueBytesStoreSupplier implementation leverages an adapter (VersionedKeyValueToBytesStoreAdapter) to assist in converting from VersionedKeyValueStore to VersionedBytesStore.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-17 11:06:04 -08:00
Chia-Ping Tsai 7e149990bd
KAFKA-14717 KafkaStreams can' get running if the rebalance happens be… (#13248)
I noticed this issue when tracing #12590.

StreamThread closes the consumer before changing state to DEAD. If the partition rebalance happens quickly, the other StreamThreads can't change KafkaStream state from REBALANCING to RUNNING since there is a PENDING_SHUTDOWN StreamThread

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-02-17 08:40:34 -08:00
Philip Nee 82d5720aae
KAFKA-14253 - More informative logging (#13253)
Includes 2 requirements from the ticket:
* Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as")
* Sort the member ids (to help compare the membership and assignment across rebalances)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-02-16 16:54:50 -08:00
Christo Lolov ba0c5b0902
MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-16 16:13:31 +01:00
Victoria Xia dcaf95a35f
KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store).

Part of KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-15 18:07:47 -08:00
Victoria Xia bfeef29804
KAFKA-14491: [7/N] Enforce strict grace period for versioned stores (#13243)
Changes the versioned store semantics to define an explicit "grace period" property. Grace period will always be equal to the history retention, though in the future we could introduce a new KIP to expose options to configure grace period separately.

Part of KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-15 18:02:20 -08:00
Victoria Xia 528a777df6
KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog (#13189)
This PR builds on the new RocksDB-based versioned store implementation (see KIP-889) by adding code for restoring from changelog. The changelog topic format is the same as for regular timestamped key-value stores: record keys, values, and timestamps are stored in the Kafka message key, value, and timestamp, respectively. The code for actually writing to this changelog will come in a follow-up PR.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-13 17:06:44 -08:00
Victoria Xia df22a9d0e6
KAFKA-14491: [5/N] Basic operations for RocksDB versioned store (#13188)
Introduces the VersionedKeyValueStore interface proposed in KIP-889, along with the RocksDB-based implementation of the interface. This PR includes fully functional put, get, get-with-timestamp, and delete operations, but does not include the ability to restore records from changelog or surrounding store layers (for metrics or writing to the changelog). Those pieces will come in follow-up PRs.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-10 17:30:09 -08:00
Guozhang Wang 083e11a22c
KAFKA-14650: Synchronize access to tasks inside task manager (#13167)
1. The major fix: synchronize access to tasks inside task manager, this is a fix of a regression introduced in #12397
2. Clarify on func names of StreamThread that maybe triggered outside the StreamThread.
3. Minor cleanups.

Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>
2023-02-09 10:33:19 -08:00
Guozhang Wang 788793dee6
KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
1. Add the new API (default impl is empty) to StateRestoreListener.
2. Update related unit tests

Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
2023-02-07 11:33:09 -08:00
Matthias J. Sax 463bb00b11
MINOR: remove unncessary helper method (#13209)
Reviewers: Christo Lolov (@clolov), Lucas Brutschy <lbrutschy@confluent.io>, Ismael Juma <ismale@confluent.io>
2023-02-07 11:21:58 -08:00
Christo Lolov a0a9b6ffea
MINOR: Remove unnecessary code (#13210)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-07 17:37:45 +01:00
Victoria Xia 4a7fedd46a
KAFKA-14491: [3/N] Add logical key value segments (#13143)
Part of KIP-889

Reviewers: Matthias J. Sax <matthias@confuent.io>
2023-02-03 17:26:33 -08:00
Victoria Xia b8e606355b
KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-02 21:48:40 -08:00
Shekhar Rajak 3cf13064cc
Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest (#12777)
Related to KAFKA-14059 and KAFKA-14132

* Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest.java
* Reset removed which was not needed

Reviewers: Divij Vaidya <diviv@amazon.com>, Guozhang Wang <wangguoz@gmail.com>
2023-02-02 16:03:47 -08:00
Victoria Xia 65bb819313
KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store (#13126)
Part of KIP-889.

The KIP proposed the introduction of versioned key-value stores, as well as a RocksDB-based implementation. The RocksDB implementation will consist of a "latest value store" for storing the latest record version associated with each key, in addition to multiple "segment stores" to store older record versions. Within a segment store, multiple record versions for the same key will be combined into a single bytes array "value" associated with the key and stored to RocksDB.

This PR introduces the utility class that will be used to manage the value format of these segment stores, i.e., how multiple record versions for the same key will be combined into a single bytes array "value." Follow-up PRs will introduce the versioned store implementation itself (which calls heavily upon this utility class).

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-02-01 13:19:53 -08:00
Jorge Esteban Quilcate Otoya 7d61d4505a
KAFKA-14409: Clean ProcessorParameters from casting (#12879)
Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, John Roesler <vvcephei@apache.org>
2023-01-31 15:55:50 -06:00
Lucas Brutschy eb7f490159
chore: Fix scaladoc warnings (#13164)
Make sure no scaladoc warnings are emitted from the streams-scala project build.
We cannot fully fix all scaladoc warnings due to limitations of the scaladoc tool,
so this is a best-effort attempt at fixing as many warnings as possible. We also
disable one problematic class of scaladoc wornings (link errors) in the gradle build.

The causes of existing warnings are that we link to java members from scaladoc, which
is not possible, or we fail to disambiguate some members.

The broad rule applied in the changes is
 - For links to Java members such as [[StateStore]], we use the fully qualified name in a code tag
   to make manual link resolution via a search engine easy.
 - For some common terms that are also linked to Java members, like [[Serde]], we omit the link.
 - We disambiguate where possible.
 - In the special case of @throws declarations with Java Exceptions, we do not seem to be able
   to avoid the warning altogther.

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
2023-01-31 09:00:48 -08:00
Victoria Xia 6c98544a96
KAFKA-14491: [2/N] Refactor RocksDB store open iterator management (#13142)
This PR refactors how the list of open iterators for RocksDB stores is managed. Prior to this PR, the `openIterators` list was passed into the constructor for the iterators themselves, allowing `RocksDbIterator.close()` to remove the iterator from the `openIterators` list. After this PR, the iterators themselves will not know about lists of open iterators. Instead, a generic close callback is exposed, and it's the responsibility of the store that creates a new iterator to set the callback on the iterator, to ensure that closing an iterator removes the iterator from the list of open iterators.

This refactor is desirable because it enables more flexible iterator lifecycle management. Building on top of this, RocksDBStore is updated with an option to allow the user (i.e., the caller of methods such as `range()` and `prefixScan()` which return iterators) to pass a custom `openIterators` list for the new iterator to be stored in. This will allow for a new Segments implementation where multiple Segments can share the same RocksDBStore instance, while having each Segment manage its own open iterators.

Part of KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-01-31 00:05:43 -08:00
Matthias J. Sax dc01199271
KAFAK-14660: Fix divide-by-zero vulnerability (#13175)
This PR adds a safe-guard for divide-by-zero. While `totalCapacity` can never be zero, an explicit error message is desirable.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2023-01-30 23:39:41 -08:00
Lucas Brutschy 1d0585563b
MINOR: fix flaky DefaultStateUpdaterTest (#13160)
Mockito should not make named topologies paused by default.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
2023-01-24 15:32:04 -08:00
A. Sophie Blee-Goldman 3799708ff0
KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest (#13156)
I immediately saw a failure with stateUpdaterEnabled = true after disabling the false parameter, which suggests the problem actually does lie in the state updater itself and not the act of parametrization of the test. To verify this theory, and help stabilize the 3.4 release branch, let's try one more test by swapping out the true build in favor of the false one. If the listOffsets requests stop failing and causing this integration test to hit the global timeout as is currently happening at such a high rate, then we have pretty good evidence pointing at the state updater and should be able to debug things more easily from there.

After getting in a few builds to see whether the flakiness subsides, we should merge this PR to re-enable both parameters going forward: https://github.com/apache/kafka/pull/13155

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2023-01-24 00:14:20 -08:00
A. Sophie Blee-Goldman ee8e757878
temporarily disable the 'false' parameter (#13147)
Need to get a clean build for 3.4 and this test has been extremely flaky. I'm looking into the failure as well, and want to pinpoint whether it's the true build that's broken or it's the parameterization itself causing this -- thus, let's start by temporarily disabling the false parameter first.

See KAFKA-14533 for more details

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-01-23 15:24:35 -08:00
A. Sophie Blee-Goldman 0601fa0935
MINOR: fix flaky integrations tests by using 60s default timeout for startup (#13141)
The timeouts used for starting up Streams and waiting for the RUNNING state are all over the place across our integration tests, with some as low as 15s (which are unsurprisingly rather flaky). We use 60s as the default timeout for other APIs in the IntegrationTestUtils so we should do the same for #startApplicationAndWaitUntilRunning

I also noticed that we have several versions of that exact API in StreamsTestUtils, so I migrated everyone over to the IntegrationTestUtils#startApplicationAndWaitUntilRunning and added a few overloads for ease of use, including one for single KafkaStreams apps and one for using the default timeout

Reviewers: Matthias J. Sax <mjsax@apache.org>
2023-01-22 15:57:58 -08:00
A. Sophie Blee-Goldman 123e0e9ca9
MINOR: fix warnings in Streams javadocs (#13132)
While working on the 3.4 release I noticed we've built up an embarrassingly long list of warnings within the Streams javadocs. It's unavoidable for some links to break as the source code changes, but let's reset back to a good state before the list gets even longer

Reviewers: Matthias J. Sax <mjsax@apache.org>, Walker Carlson <wcarlson@confluent.io>
2023-01-20 14:19:11 -08:00
Christo Lolov e235e1a3fe
KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests (#12821)
Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock and need to be moved to Mockito.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-01-19 18:44:08 +01:00
Christo Lolov 90967e81e2
Replace EasyMock with Mockito in streams tests (#12818)
Batch 6 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-01-19 14:55:01 +01:00
Divij Vaidya b2bc72dc79
MINOR: Include the inner exception stack trace when re-throwing an exception (#12229)
While wrapping the caught exception into a custom one, information about the caught
exception is being lost, including information about the stack trace of the exception.

When re-throwing an exception, we either include the original exception or the relevant
information is added to the exception message.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>, Matthew de Detrich <mdedetrich@gmail.com>
2023-01-15 15:03:23 -08:00
Federico Valeri 111f02cc74
KAFKA-14568: Move FetchDataInfo and related to storage module (#13085)
Part of KAFKA-14470: Move log layer to storage module.

Reviewers: Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Ismael Juma <ismael@juma.me.uk>
2023-01-12 21:32:23 -08:00
Lucas Brutschy 22606a0a4d
KAFKA-14530: Check state updater more often (#13017)
In the new state restoration code, the state updater needs to be checked regularly
by the main thread to transfer ownership of tasks back to the main thread once the
state of the task is restored. The more often we check this, the faster we can
start processing the tasks.

Currently, we only check the state updater once in every loop iteration of the state
updater. And while we couldn't observe this to be strictly not often enough, we can
increase the number of checks easily by moving the check inside the inner processing
loop. This would mean that once we have iterated over `numIterations` records, we can
already start processing tasks that have finished restoration in the meantime.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-01-12 12:40:07 +01:00
Christo Lolov 78d4458b94
KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (#12301)
This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14003. It is the second of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2023-01-11 09:26:48 +01:00
José Armando García Sancio 896573f9bc
KAFKA-14279: Add 3.3.x streams system tests (#13077)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-01-09 23:37:05 -08:00
A. Sophie Blee-Goldman 2060b057b0
MINOR: bump streams quickstart pom versions and add to list in gradle.properties (#13064)
The three pom files for the Streams quickstart also need to bump their versions after a branch cut. For some reason these are included (late) in the Release Process guide, but are missing from the list of what to update when bumping the version in gradle.properties. This commit adds the missing files to this list to help future RMs locate all the required version changes

Reviewers: Ismael Juma <ismael@juma.me.uk>
2023-01-09 15:51:44 -08:00
Ismael Juma 96d9710c17
KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module (#13049)
Additional notable changes to fix multiple dependency ordering issues:

* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
   `ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`

Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
2023-01-04 02:42:52 -08:00
Satish Duggana 8d28c3d55e
MINOR Fix checkstyle failures in streams/examples module. (#13055)
MINOR Fix checkstyle failures in streams/examples module. (#13055)
2022-12-29 16:29:18 +05:30
Himani Arora 202a8cd255
MINOR: Fixed type in KTable JavaDocs(#6867)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-28 16:06:59 -08:00
Chia-Ping Tsai a1db11e82b
MINOR: remove unused org.apache.kafka.streams.processor.internals.RestoringTasks (#10164)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-28 15:50:37 -08:00
David Karlsson 4e1b6d3f28
MINOR: Update WordCountTransformerDemo comments (#12470)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-28 15:39:19 -08:00
Vladimir Korenev eeedde7ea9
MINOR: Add implicit for Serde[UUID] to Streams Scala API (#8335)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-28 14:54:22 -08:00
Josep Prat 5f1810209f
MINOR: Fix small warning on javadoc and scaladoc (#11049)
Escape the `>` character in javadoc
Escape the `$` character when part of `${}` in scaladoc as this is the way to reference a variable

Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-28 13:41:45 -08:00
Qing 9c6c6bfa2b
KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
Reviewers: Matthias J. Sax <matthias@confluent.io>, Hao Li <hli@confluent.io>
2022-12-28 12:32:54 -08:00
Greg Harris 8f0e6c6334
KAFKA-13881: Add Streams package infos (#12936)
Reviewers: Christo Lolov (@clolov), Matthias J. Sax <matthias@confluent.io>
2022-12-27 15:37:25 -08:00
Hao Li ca15735fa7
MINOR: remove onChange call in stream assignor assign method (#13034)
Reviewers: John Roesler <vvcephei@apache.org>
2022-12-21 18:32:05 -06:00
Lucas Brutschy 26daa8d610
MINOR: Fix various memory leaks in tests (#12959)
Various tests in the streams park were leaking native memory.

Most tests were fixed by closing the corresponding rocksdb resource.

I tested that the corresponding leak is gone by using a previous rocksdb
release with finalizers and checking if the finalizers would be called at some
point.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-12-21 13:38:05 +01:00
vamossagar12 409794b5ae
KAFKA-14461: Move StoreQueryIntegrationTest to junit5 and fixing logic in a couple of tests for finding active streams (#13014)
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores and StoreQueryIntegrationTest#shouldQueryOnlyActivePartitionStoresByDefault has a logic to find active partitions by doing a modulo with 2 and comparing the remainder. This can break when a new test is added and since Junit chooses an arbitrary order to run the tests, modulo checks can fail. This PR tries to make it deterministic.
Also, this PR uses Junit5 annotations so that the cluster and input topic can be setup/destroyed once.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-12-21 13:33:55 +01:00
Lucas Brutschy 9df069f372
KAFKA-14299: Avoid interrupted exceptions during clean shutdown (#13026)
The call to `interrupt` on the state updater thread during shutdown
could interrupt the thread while writing the checkpoint file. This
can cause a failure to write the checkpoint file and a misleading
stack trace in the logs.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-12-21 08:48:12 +01:00
Bill Bejeck ea65d74f6b
MINOR: No error with zero results state query (#13002)
This PR updates StateQueryResult.getOnlyPartitionResult() to not throw an IllegaArgumentException when there are 0 query results.

Added a test that will fail without this patch

Reviewers: John Roesler<vvcephei@apache.org>
2022-12-19 13:39:06 -05:00
vamossagar12 a46d16e7ab
Removing Multicasting partitioner for IQ (#12977)
Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-12-15 15:09:41 -08:00
Hao Li 9b23d9305d
KAFKA-14395: add config to configure client supplier (#12944)
Implements KIP-884.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-12-14 09:17:27 -08:00
vamossagar12 73ea6986df
KAFKA-13602: Remove unwanted logging in RecordCollectorImpl.java (#12985)
There is unwanted logging introduced by #12803 as pointed out in this comment: #12803 (comment). This PR removes it.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2022-12-13 16:36:00 +01:00
vamossagar12 2fa1879247
KAFKA-14454: Making unique StreamsConfig for tests (#12971)
Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored.

That seemed to have been because of the way StreamsConfig was being initialised so any new test would have used the same names. Because of which the second test never got to the desired state. With this PR, every test gets a unique app name which seems to have fixed the issue. Also, a couple of cosmetic changes

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-12-09 17:51:42 -08:00
A. Sophie Blee-Goldman d9b139220e
KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
First PR for KIP-878: Internal Topic Autoscaling for Kafka Streams

Introduces two new configs related to autoscaling in Streams: a feature flag and retry timeout. This PR just adds the configs and gets them passed through to the Streams assignor where they'll ultimately be needed/used

Reviewers: Bill Bejeck <bill@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2022-12-09 15:02:36 -08:00
Lucas Brutschy 36a2f7bfd0
KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.

I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Christo Lolov <lolovc@amazon.com>
2022-12-07 18:25:58 -08:00
Lucia Cerchie 923fea583b
KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
As a result of "14260: InMemoryKeyValueStore iterator still throws ConcurrentModificationException", I'm adding synchronized to prefixScan as an alternative to going back to the ConcurrentSkipList.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-12-06 19:39:32 -08:00
Lucas Brutschy 96b1db510a
KAFKA-14415: Faster ThreadCache (#12903)
Optimization of `ThreadCache`. The original implementation showed significant slow-down when many caches were registered.

`sizeBytes` was called at least once, and potentially many times
in every `put` and was linear in the number of caches (= number of
state stores, so typically proportional to number of tasks). That
means, with every additional task, every put gets a little slower.
This was confirmed experimentally.

In this change, we modify the implementation of `ThreadCache` to
keep track of the total size in bytes. To be independent of the
concrete implementation of the underlying cache, we update the size
by subtracting the old and adding the new size of the cache before
and after every modifying operation. For this we acquire the object
monitor of the cache, but since all modifying operations on the caches
are synchronized already, this should not cause extra overhead.

This change also fixes a `ConcurrentModificationException` that could
be thrown in a race between `sizeBytes` and `getOrCreate`.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-12-06 17:22:42 -08:00
vamossagar12 77e294e7fc
KAFKA-13602: Adding ability to multicast records (#12803)
This PR implements KIP-837 which enhances StreamPartitioner to multicast records.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, YEONCHEOL JANG
2022-12-06 02:01:38 -08:00
Divij Vaidya f1568e5996
MINOR: Prevent NPE in SmokeTestDriver (fix flaky test) (#12908)
SmokeTestDriverIntegrationTest.java can be flaky because a NullPointerException prevents the retry mechanism that is added to prevent flakiness for this test. This change, prevents the NullPointerException and hence, allows the test to retry itself.

Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
2022-12-06 10:52:58 +08:00
vamossagar12 6663acff23
KAFKA-13152: Add cache size metrics (#12778)
Adds the new DEBUG metric cache-size-bytes-total

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-11-30 17:54:37 -08:00
Joel Hamill d9946a7ffc
MINOR: Fix config documentation formatting (#12921)
Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-11-30 08:54:39 -08:00
Lucas Brutschy 9ea3d0d1c8
KAFKA-12679: Handle lock exceptions in state updater (#12875)
In this change, we enable backing off when the state directory
is still locked during initialization of a task. When the state
directory is locked, the task is reinserted into the
initialization queue. We will reattempt to acquire the lock
after the next round of polling.

Tested through a new unit test.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-28 17:17:14 +01:00
Lucas Brutschy fea0eb4ca3
KAFKA-14299: Handle double rebalances better (#12904)
The original implementation of the state updater could not
handle double rebalances within one poll phase correctly,
because it could create tasks more than once if they hadn't
finished initialization yet.

In a55071a, we
moved initialization to the state updater to fix this. However,
with more testing, I found out that this implementation has
it's problems as well: There are problems with locking the
state directory (state updater acquired the lock to the state
directory, so the main thread wouldn't be able to clear the
state directory when closing the task), and benchmarks also
show that this can lead to useless work (tasks are being
initialized, although they will be taken from the thread soon
after in a follow-up rebalance).

In this PR, I propose to revert the original change, and fix
the original problem in a much simpler way: When we
receive an assignment, we simply clear out the
list of tasks pending initialization. This way, no double
tasks instantiations can happen.

The change was tested in benchmarks, system tests,
and the existing unit & integration tests. We also add
the state updater to the smoke integration test, which
triggered the double task instantiations before.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-28 13:16:44 +01:00
Christo Lolov 54efc4f109
KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12505)
Batch 2 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.

Reviewers: Matthew de Detrich <matthew.dedetrich@aiven.io>, Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org
2022-11-21 13:12:22 +01:00
Jorge Esteban Quilcate Otoya 0de037423b
KAFKA-14325: Fix NPE on Processor Parameters toString (#12859)
Handle null processor supplier

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-11-20 18:24:04 -08:00
A. Sophie Blee-Goldman 56ab2f8034
KAFKA-14382: wait for current rebalance to complete before triggering followup (#12869)
Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions.

Reviewers: John Roesler <vvcephei@apache.org>
2022-11-18 22:38:58 -08:00
Nick Telford 1d6430249b
KAFKA-14406: Fix double iteration of restoring records (#12842)
While restoring a batch of records, RocksDBStore was iterating the ConsumerRecords, building a list of KeyValues, and then iterating that list of KeyValues to add them to the RocksDB batch.

Simply adding the key and value directly to the RocksDB batch prevents this unnecessary second iteration, and the creation of itermediate KeyValue objects, improving the performance of state restoration, and reducing unnecessary object allocation.

This also simplifies the API of RocksDBAccessor, as prepareBatchForRestore is no longer needed.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
2022-11-18 20:44:56 -08:00
Bill Bejeck 3012332e3d
KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL (#12861)
With the addition of the new Processor API the newly added FixedKeyProcessorNodeFactory extends the ProcessorNodeFactory class. The ProcessorNodeFactory had a private field Set<String> stateStoreNames initialized to an empty see. The FixedKeyProcessorNodeFactory also had a private field Set<String> stateStoreNames.

When executing InternalTopologyBuilder.build executing the buildProcessorNode method passed any node factory as ProcessorNodeFactory and the method references the stateStoreNames field, it's pointing to the superclass field, which is empty so the corresponding StoreBuilder(s) are never added - causing NPE in the topology.

This PR makes the field protected on the ProcessorNodeFactory class so FixedKeyProcessorNodeFactory inherits it.

The added test fails without this change.

Reviewers: Matthias J. Sax <mjsax@apache.org>,  Sophie Blee-Goldman <sophie@confluent.io>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
2022-11-16 17:06:15 -05:00
Hao Li 76214bfb85
KAFKA-13785: Add JavaDocs for emit final (#12867)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2022-11-16 11:30:24 -08:00
Lucas Brutschy a55071a99f
KAFKA-14299: Initialize tasks in state updater (#12795)
The state updater code path puts tasks into an
"initialization queue", with created, but not initialized tasks.
These are later, during the event-loop, initialized and added
to the state updater. This might lead to losing track of those 
tasks - in particular it is possible to create
tasks twice, if we do not go once around `runLoop` to initialize
the task. This leads to `IllegalStateExceptions`. 

By handing the task to the state updater immediately and let the
state updater initialize the task, we can fulfil our promise to 
preserve the invariant "every task is owned by either the task 
registry or the state updater".

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-14 10:00:29 +01:00
A. Sophie Blee-Goldman e422a67d3f
KAFKA-14294: check whether a transaction is in flight before skipping a commit (#12835)
Add a new #transactionInFlight API to the StreamsProducer to expose the flag of the same name, then check whether there is an open transaction when we determine whether or not to perform a commit in TaskExecutor. This is to avoid unnecessarily dropping out of the group on transaction timeout in the case a transaction was begun outside of regular processing, eg when a punctuator forwards records but there are no newly consumer records and thus no new offsets to commit.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-14 09:43:46 +01:00
A. Sophie Blee-Goldman 51b7eb7937
KAFKA-14282: stop tracking Produced sensors by processor node id (#12836)
Users have been seeing a large number of these error messages being logged by the RecordCollectorImpl:

Unable to records bytes produced to topic XXX by sink node YYY as the node is not recognized.
It seems like we try to save all known sink nodes when the record collector is constructed, by we do so by going through the known sink topics which means we could miss some nodes, for example if dynamic topic routing is used. Previously we were logging an error and would skip recording the metric if we tried to send a record from a sink node it didn't recognize, but there's not really any reason to have been tracking the sensors by node in the first place -- we can just track the actual sink topics themselves.

Reviewers: John Roesler <vvcephei@apache.org>, Christo Lolov <lolovc@amazon.com>
2022-11-11 17:58:08 -08:00
Christo Lolov 876c338a60
[KAFKA-14324] Upgrade RocksDB to 7.1.2 (#12809)
Reviewers: Bruno Cadonna <cadonna@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-11-11 17:48:38 -08:00
Lucas Brutschy ce5faa222b
MINOR: Fix flaky RestoreIntegrationTest (#12841)
RestoreIntegrationTest used polling to determine if a rebalance
happens on one client, but if the rebalance would happen too quickly,
the polling would not pick it up and the check would time out.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-10 14:11:03 +01:00
Lucas Brutschy c034388a0a
KAFKA-14299: Avoid allocation & synchronization overhead in StreamThread loop (#12808)
The state updater code path introduced allocation and synchronization
overhead by performing relatively heavy operations in every iteration of
the StreamThread loop. This includes various allocations and acquiring
locks for handling `removedTasks` and `failedTasks`, even if the
corresponding queues are empty.

This change introduces `hasRemovedTasks` and
`hasExceptionsAndFailedTasks` in the `StateUpdater` interface that
can be used to skip over any allocation or synchronization. The new
methods do not require synchronization or memory allocation.

This change increases throughput by ~15% in one benchmark.

We extend existing unit tests to cover the slightly modified
behavior.

Reviewer: Bruno Cadonna <cadonna@apache.org>
2022-11-08 17:55:37 +01:00
Lucas Brutschy 4560978ed7
KAFKA-14309: FK join upgrades not tested with DEV_VERSION (#12760)
The streams upgrade system inserted FK join code for every version of the
the StreamsUpgradeTest except for the latest. Also, the original code
never switched on the `test.run_fk_join` flag for the target version of
the upgrade.

The effect was that FK join upgrades were not tested at all, since
no FK join code was executed after the bounce in the system test.

We introduce `extra_properties` in the system tests, that can be used
to pass any property to the upgrade driver, which is supposed to be
reused by system tests for switching on and off flags (e.g. for the
state restoration code).

Reviewers: Alex Sorokoumov <asorokoumov@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2022-11-07 15:46:51 -08:00
Lucas Brutschy cd5f6c60b5
KAFKA-14299: Avoid busy polling in state updater (#12772)
The state updater can enter a busy polling loop if it
only updates standby tasks. We need to use the user-provided
poll-time to update always when using the state updater, since
the only other place where the state update blocks
(inside `waitIfAllChangelogsCompletelyRead`) is also
not blocking if there is at least one standby task.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
2022-11-07 16:46:25 +01:00
Ashmeet Lamba a971448f3f
KAFKA-14254: Format timestamps as dates in logs (#12684)
Improves logs withing Streams by replacing timestamps to date instances to improve readability.

Approach - Adds a function within common.utils.Utils to convert a given long timestamp to a date-time string with the same format used by Kafka's logger.

Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
2022-11-07 13:42:39 +01:00
Lucas Brutschy 37a3645e7e
KAFKA-14299: Return emptied ChangelogReader to ACTIVE_RESTORING (#12773)
The ChangelogReader starts of in `ACTIVE_RESTORING` state, and
then goes to `STANDBY_RESTORING` when changelogs from standby
tasks are added. When the last standby changelogs are removed,
it remained in `STANDBY_RESTORING`, which means that an empty
ChangelogReader could be in either `ACTIVE_RESTORING` or
`STANDBY_RESTORING` depending on the exact sequence of
add/remove operations. This could lead the state updater into
an illegal state. Instead of changing the state updater,
I propose to stengthen the state invariant of the
`ChangelogReader` slightly: it should always be in
`ACTIVE_RESTORING` state, when empty.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
2022-11-07 13:35:18 +01:00
vamossagar12 7fd6a9b3e2
Kafka 12960: Follow up Commit to filter expired records from Windowed/Session Stores (#12756)
KAFKA-12960: Enforcing strict retention time for WindowStore and SessionStore

Reviewers: Luke Chen <showuon@gmail.com>, Vicky Papavasileiou
2022-11-07 11:53:34 +08:00