1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all.
2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from handleAssignment): 1) Task-migrated, throw all the way to stream-thread as handleTaskMigrated, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as handleCorruption. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from handleAssignment or not (when they are thrown from the state updater).
Reviewers: Bruno Cadonna <cadonna@apache.org>
Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
Reviewer: Luke Chen <showuon@gmail.com>
Changelogs are already unregistered when tasks are closed.
There is no need to also unregister them in the state
updater.
In future, when we will only have the state updater without
the old code path, we should consider registering and
unregistering the changelogs within the state updater.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
When the task manager is shutdown, the state updater should also
shutdown. After the shutdown of the state updater, the tasks
in its output queues should be closed.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Changes:
- Migrate to Mockito
- Add more assertive checks using verify
- Minor indentation fixes
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
The state updater removes its updating and paused task on shutdown.
The removed tasks are added to the output queue for removed tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Once the state updater restored an active task it puts it
into an output queue. The stream thread reads the restored
active task from the output queue and after it verified
that the task is still owned by the stream thread it transits
it to RUNNING.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug.
Reviewers: John Roesler <vvcephei@apache.org>
Compilation is failing after these two commits:
```
> Task :streams:compileJava
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
^
symbol: method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks
1 error
```
Also here:
```
[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
[2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in class TaskManagerTest
[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
```
This patch reverts them.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. Within the tryCompleteRestore function of the thread, try to drain the removed tasks from state updater and handle accordingly: 1) for recycle, 2) for closure, 3) for update input partitions.
2. Catch up on some unit test coverage from previous PRs.
3. Some minor cleanups around exception handling.
Reviewers: Bruno Cadonna <cadonna@apache.org>
We recently added a builder to create task mocks for unit
tests. This PR adds the functionality to add input partitions
to task mocks when the builder is used.
Reviewers: Walker Carlson <wcarlson@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@apache.org>
The state updater exposes tasks that are in restoration
to the stream thread. To ensure that the stream thread
only accesses the tasks to read from the tasks without
modifying any internal state, this PR introduces a
read-only task that throws an exception if the caller
tries to modify the internal state of a task.
This PR also returns read-only tasks from
DefaultStateUpdater#getTasks().
Reviewer: Guozhang Wang <wangguoz@gmail.com>
While working on KAFKA-13877, I feel it's an overkill to introduce the whole test class as an integration test, since all we need is to just test the assignor itself which could be a unit test. Running this suite with 9+ instances takes long time and is still vulnerable to all kinds of timing based flakiness. A better choice is to reduce it as a unit test, similar to HighAvailabilityStreamsPartitionAssignorTest that just test the behavior of the assignor itself, rather than creating many instances hence depend on various timing bombs to not explode.
Since we mock everything, there's no flakiness anymore. Plus we greatly reduced the test runtime (on my local machine, the old integration takes about 35 secs to run the whole suite, while the new one take 20ms on average).
Reviewers: Divij Vaidya <diviv@amazon.com>, Dalibor Plavcic
In the current test, we check for tag distribution immediately after everyone is on the running state, however due to the fact of the follow-up rebalances, "everyone is now in running state" does not mean that the cluster is now stable. In fact, a follow-up rebalance may occur, upon which the local thread metadata would return empty which would cause the distribution verifier to fail.
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>
When the migration of the Streams project to JUnit 5 started with PR #12285, we discovered that the migrated tests were not run by the PR builds. This PR ensures that Streams' tests that are written in JUnit 4 and JUnit 5 are run in the PR builds.
Co-authored-by: Divij Vaidya <diviv@amazon.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Bookkeeps tasks to be recycled, closed, and updated during handling of the assignment. The bookkeeping is needed for integrating the state updater.
These change is hidden behind internal config STATE_UPDATER_ENABLED. If the config is false Streams should not use the state updater and behave as usual.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. Consolidate the task recycle procedure into a single function within the task. The current procedure now becomes: a) task.recycleStateAndConvert, at end of it the task is in closed while its stateManager is retained, and the manager type has been converted; 2) create the new task with old task's fields and the stateManager inside the creators.
2. Move the task execution related metadata into the corresponding TaskExecutionMetadata class, including the task idle related metadata (e.g. successfully processed tasks); reduce the number of params needed for TaskExecutor as well as Tasks.
3. Move the task execution related fields (embedded producer and consumer) and task creators out of Tasks and migrated into TaskManager. Now the Tasks is only a bookkeeping place without any task mutation logic.
4. When adding tests, I realized that we should not add task to state updater right after creation, since it was not initialized yet, while state updater would validate that the task's state is already restoring / running. So I updated that logic while adding unit tests.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This PR introduces an internal config to enable the state updater. If the state updater is enabled newly created tasks are added to the state updater. Additionally, this PR introduces a builder for mocks for tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14001. It is the first of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
- used static memberId was incorrect
- need to remove all threads/members from the group
- need to use admit client correctly
Add test to verify fixes.
Reviewers: Matthias J. Sax <matthias@confluent.io>
In order to integrate with the state updater, we would need to refactor the TaskManager and Task interfaces. This PR achieved the following purposes:
Separate active and standby tasks in the Tasks placeholder, plus adding pendingActiveTasks and pendingStandbyTasks into Tasks. The exposed active/standby tasks from the Tasks set would only be mutated by a single thread, and the pending tasks hold for those tasks that are assigned but cannot be actively managed yet. For now they include two scenarios: a) tasks from unknown sub-topologies and hence cannot be initialized, b) tasks that are pending for being recycled from active to standby and vice versa. Note case b) would be added in a follow-up PR.
Extract any logic that mutates a task out of the Tasks / TaskCreators. Tasks should only be a place for maintaining the set of tasks, but not for manipulations of a task; and TaskCreators should only be used for creating the tasks, but not for anything else. These logic are all migrated into TaskManger.
While doing 2) I noticed we have a couple of minor issues in the code where we duplicate the closing logics, so I also cleaned them up in the following way:
a) When closing a task, we first trigger the corresponding closeClean/Dirty function; then we remove the task from Tasks bookkeeping, and for active task we also remove its task producer if EOS-V1 is used.
b) For closing dirty, we swallow the exception from close call and the remove task producer call; for closing clean, we store the thrown exception from either close call or the remove task producer, and then rethrow at the end of the caller. The difference though is that, for the exception from close call we need to retry close it dirty; for the exception from the remove task producer we do not need to re-close it dirty.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Override the default handler for stream threads if the stream's handler is used. We do no want the java default handler triggering when a thread is replaced.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
* Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task.
* Do not expose another getResumedTasks since I think its caller only need the getPausedTasks.
Reviewers: Bruno Cadonna <cadonna@apache.org>
* Add pause action to task-updater.
* When removing a task, also check in the paused tasks in addition to removed tasks.
* Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit changes the version check from != to > as the process method
works correctly on both version 1 and 2. != incorrectly throws on v1
records.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group.
Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: David Jacot <djacot@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
1. Added more unit test for RocksDBTimeOrderedSessionStore and RocksDBTimeOrderedSessionSegmentedBytesStore
2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
KIP-770 introduced a performance regression and needs some re-design.
Needed to resolve some conflict while reverting.
This reverts commits 1317f3f77a and 0924fd3f9f.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Guozhang Wang <guozhang@confluent.io>
When store changelog reader is called by a different thread than the stream thread, it can no longer use the main consumer to get committed offsets since consumer is not thread-safe. Instead, we would remove main consumer and leverage on the existing admin client to get committed offsets.
Reviewers: Bruno Cadonna <cadonna@apache.org>
The call to Task#completeRestoration calls methods on the main consumer.
The state updater thread should not access the main consumer since the
main consumer is not thread-safe. Additionally, Task#completeRestoration
changed the state of active tasks, but we decided to keep task life cycle
management outside of the state updater.
Task#completeRestoration should be called by the stream thread on
restored active tasks returned by the state udpater.
Reviewer: Guozhang Wang <guozhang@apache.org>
Currently the tests fail because there is a missing predicate in the retrievableException which causes the test to fail, i.e. the current predicates
containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
containsString("The state store, source-table, may have migrated to another instance"),
containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING")
wasn't complete. Another one needed to be added, namely "The specified partition 1 for store source-table does not exist.". This is because its possible for
assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
or
assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
(depending on which branch) to be thrown, i.e. see
org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The specified partition 1 for store source-table does not exist.
at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:223)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.retryUntil(StoreQueryIntegrationTest.java:579)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:186)
This happens when the stream hasn't been initialized yet. I have run the test around 12k times using Intellij's JUnit testing framework without any flaky failures. The PR also does some minor refactoring regarding moving the list of predicates into their own functions.
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
1. As titled, fix the right constructor param ordering.
2. Also added a few more loglines.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
For most tests we would need an auto-ticking mock timer to work with draining-with-timeout functions.
For tests that check for never checkpoint we need no auto-ticking timer to control exactly how much time elapsed.
Reviewers: Bruno Cadonna <cadonna@apache.org>
* Add a new API for session windows to range query session window by end time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.
Reviewers: Guozhang Wang <wangguoz@gmail.com>