The state updater code path introduced allocation and synchronization
overhead by performing relatively heavy operations in every iteration of
the StreamThread loop. This includes various allocations and acquiring
locks for handling `removedTasks` and `failedTasks`, even if the
corresponding queues are empty.
This change introduces `hasRemovedTasks` and
`hasExceptionsAndFailedTasks` in the `StateUpdater` interface that
can be used to skip over any allocation or synchronization. The new
methods do not require synchronization or memory allocation.
This change increases throughput by ~15% in one benchmark.
We extend existing unit tests to cover the slightly modified
behavior.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The streams upgrade system inserted FK join code for every version of the
the StreamsUpgradeTest except for the latest. Also, the original code
never switched on the `test.run_fk_join` flag for the target version of
the upgrade.
The effect was that FK join upgrades were not tested at all, since
no FK join code was executed after the bounce in the system test.
We introduce `extra_properties` in the system tests, that can be used
to pass any property to the upgrade driver, which is supposed to be
reused by system tests for switching on and off flags (e.g. for the
state restoration code).
Reviewers: Alex Sorokoumov <asorokoumov@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
The state updater can enter a busy polling loop if it
only updates standby tasks. We need to use the user-provided
poll-time to update always when using the state updater, since
the only other place where the state update blocks
(inside `waitIfAllChangelogsCompletelyRead`) is also
not blocking if there is at least one standby task.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
Improves logs withing Streams by replacing timestamps to date instances to improve readability.
Approach - Adds a function within common.utils.Utils to convert a given long timestamp to a date-time string with the same format used by Kafka's logger.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The ChangelogReader starts of in `ACTIVE_RESTORING` state, and
then goes to `STANDBY_RESTORING` when changelogs from standby
tasks are added. When the last standby changelogs are removed,
it remained in `STANDBY_RESTORING`, which means that an empty
ChangelogReader could be in either `ACTIVE_RESTORING` or
`STANDBY_RESTORING` depending on the exact sequence of
add/remove operations. This could lead the state updater into
an illegal state. Instead of changing the state updater,
I propose to stengthen the state invariant of the
`ChangelogReader` slightly: it should always be in
`ACTIVE_RESTORING` state, when empty.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
State stores are initialized from the StreamThread even when
the state updater thread is enabled. However, we were missing
the corresponding handling of exceptions when thrown directly
during the initialization. In particular, TaskCorruptedException
would directly fall through to runLoop, and the task
would fall out of book-keeping, since the exception is thrown
when neither the StreamThread nor the StateUpdater is owning
the task.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
This PR build on top of #11017. I have added the previous author's comment in this PR for attribution. I have also addressed the pending comments from @chia7712 in this PR.
Notes to help the reviewer:
Mockito has mockStatic method which is equivalent to PowerMock's method.
When we run the tests using @RunWith(MockitoJUnitRunner.StrictStubs.class) Mockito performs a verify() for all stubs that are mentioned, hence, there is no need to explicitly verify the stubs (unless you want to verify the number of times etc.). Note that this does not work for static mocks.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Walker Carlson <wcarlson@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Quickstart examples didn't produce any console output, since it was missing a logger implementation in the classpath.
Also some minor cleanups.
Tested by creating a test project and running the code.
PR implementing KIP-770 (#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
In two situations, the current code could transition the ChangelogReader
to UpdateStandby when already in that state, causing an IllegalStateException.
Namely these two cases are:
1. When only standby tasks are restoring and one of them crashes.
2. When only standby tasks are restoring and one of them is paused.
This change fixes both issues by only transitioning if the paused or
failed task is an active task.
Reviewer: Bruno Cadonna <cadonna@apache.org>
StreamThread in state PARTITIONS_ASSIGNED was running in
a busy loop until restoration is finished, stealing CPU
cycles from restoration.
Make sure the StreamThread uses poll_time when
state updater is enabled, and we are in state
PARTITIONS_ASSIGNED.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The original code path paused the main consumer for
all tasks before entering the restoration section
of the code, and then resumed all after restoration
has finished.
In the new state updater part of the code, tasks that
do not require restoration skip the restoration completely.
They remain with the TaskManger and are never transferred
to the StateUpdater, and thus are never resumed.
This change makes sure that tasks that remain with the
TaskManager are not paused.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
0. Add name to task executors.
1. DefaultTaskManager implementation, for interacting with the TaskExecutors and support add/remove/lock APIs.
2. Related unit tests.
0. Address comments from P1.
1. Add the DefaultTaskExecutor implementation class.
2. Related DefaultTaskExecutorTest.
Pending in future PRs: a) exception handling, primarily to send them to polling thread, b) light-weight task flushing procedure.
Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>
WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.
This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented
Reviewer: Luke Chen <showuon@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <mjsax@apache.org>
The interfaces (and their future impls) are added under the processor/internals/tasks package, to distinguish with the existing old classes:
1. TaskExecutor is the interface for a processor thread. It takes at most one task to process at a given time from the task manager. When being asked from the task manager to un-assign the current processing task, it will stop processing and give the task back to task manager.
2. TaskManager schedules all the active tasks to assign to TaskExecutors. Specifically: 1) when a task executor ask it for an unassigned task to process (assignNextTask), it will return the available task based on its scheduling algorithm. 2) when the task manager decides to commit (all) tasks, or when a rebalance event requires it to modify the maintained active tasks (via onAssignment), it will lock all the tasks that are going to be closed / committed, asking the TaskExecutor to give them back if they were being processed at the moment.
Reviewers: John Roesler <vvcephei@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Updating the input partitions of tasks also updates the mapping from
source nodes to input topics in the processor topology within the task.
The mapping is updated with the topics from the topology metadata.
The topology metadata does not prefix intermediate internal topics with
the application ID. Thus, if a standby task has input partitions from an
intermediate internal topic the update of the mapping in the processor
topology leads to an invalid topology exception during recycling of a
standby task to an active task when the input queues are created. This
is because the input topics in the processor topology and the input
partitions of the task do not match because the former miss the
application ID prefix.
The added verification to only update input partitions of standby tasks
if they really changed avoids the invalid topology exception if the
standby task only has input partitions from intermediate internal
topics since they should never change. If the standby task has input
partitions from intermediate internal topics and external topics
subscribed to via a regex pattern, the invalid topology exception
might still be triggered.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
When building streams metadata we want to build even if the host is empty as it is a common way to find the other host addresses
Reviewers: John Roesler <vvcephei@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Batch 4 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
When a topology is paused / resumed, we also need to pause / resume its corresponding tasks inside state updater.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
When the state updater only contains standby tasks and then a
standby task is removed, an IllegalStateException is thrown
because the changelog reader does not allow to switch to standby
updating mode more than once in a row.
This commit fixes this bug by checking that the removed task is
an active one before trying to switch to standby updating mode.
If the task to remove is a standby task then either we are already
in standby updating mode and we should not switch to it again or
we are not in standby updating mode which implies that there are
still active tasks that would prevent us to switch to standby
updating mode.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
This PR makes the following changes:
* Moves the only test in StateRestorationIntegrationTest into RestoreIntegrationTest
* Deletes StateRestorationIntegrationTest
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR is part of a series implementing the self-join rewriting. As part of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE which applies all optimization rules or a comma separated list of specific optimizations.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
Transforms the integration test that verifies restoration in a
parametrized test. The parametrized test runs once with
state updater enabled and once with state updater disabled.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Registering and unregistering the changelog topics in the
changelog reader outside of the state updater leads to
race conditions between the stream thread and the state
updater thread. Thus, this PR moves registering and
unregistering of changelog topics in the changelog
reader into the state updater if the state updater
is enabled.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
In the first attempt to handle revoked tasks in the state updater
we removed the revoked tasks from the state updater and added it to
the set of pending tasks to close cleanly. This is not correct since
a revoked task that is immediately reassigned to the same stream thread
would neither be re-added to the state updater nor be created again.
Also a revoked active task might be added to more than one bookkeeping
set in the tasks registry since it might still be returned from
stateUpdater.getTasks() after it was removed from the state updater.
The reason is that the removal from the state updater is done
asynchronously.
This PR solves this issue by introducing a new bookkeeping set
in the tasks registry to bookkeep revoked active tasks (actually
suspended active tasks).
Additionally this PR closes some testing holes around the modified
code.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
Why
Current set of integration tests leak files in the /tmp directory which makes it cumbersome if you don't restart the machine often.
Fix
Replace the usage of File.createTempFile with existing TestUtils.tempFile method across the test files. TestUtils.tempFile automatically performs a clean up of the temp files generated in /tmp/ folder.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <mlists@juma.me.uk>
Separates the code path for the new state updater from
the code path of the old restoration.
Ensures that with the state updater tasks are processed
before all tasks are running.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io
Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Dalibor Plavcic, Matthew de Detrich <mdedetrich@gmail.com>, Bruno Cadonna <cadonna@apache.org>
1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all.
2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from handleAssignment): 1) Task-migrated, throw all the way to stream-thread as handleTaskMigrated, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as handleCorruption. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from handleAssignment or not (when they are thrown from the state updater).
Reviewers: Bruno Cadonna <cadonna@apache.org>
Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
Reviewer: Luke Chen <showuon@gmail.com>
Changelogs are already unregistered when tasks are closed.
There is no need to also unregister them in the state
updater.
In future, when we will only have the state updater without
the old code path, we should consider registering and
unregistering the changelogs within the state updater.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
When the task manager is shutdown, the state updater should also
shutdown. After the shutdown of the state updater, the tasks
in its output queues should be closed.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Changes:
- Migrate to Mockito
- Add more assertive checks using verify
- Minor indentation fixes
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
The state updater removes its updating and paused task on shutdown.
The removed tasks are added to the output queue for removed tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Once the state updater restored an active task it puts it
into an output queue. The stream thread reads the restored
active task from the output queue and after it verified
that the task is still owned by the stream thread it transits
it to RUNNING.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug.
Reviewers: John Roesler <vvcephei@apache.org>
Compilation is failing after these two commits:
```
> Task :streams:compileJava
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
^
symbol: method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks
1 error
```
Also here:
```
[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
[2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in class TaskManagerTest
[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
```
This patch reverts them.
Reviewers: Ismael Juma <ismael@juma.me.uk>