This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.
This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Before #14648, the KStreamImplJoin class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended.
To fix this, we allow creating non-timestamped stores using the DslWindowParams
Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. I also confirmed that the only place in the code persistentWindowStore was used before #14648 was in the StreamJoined code.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
We are not properly closing Closeable resources in the code base at multiple places especially when we have an exception. This code change fixes multiple of these leaks.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.
The fix is to clear the offsets in the record collector on close.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-14412: Generalise over RocksDB WriteBatch
The type hierarchy of RocksDB's `WriteBatch` looks like this:
```
+---------------------+
| WriteBatchInterface |
+---------------------+
^
|
+---------------------+
| AbstractWriteBatch |
+---------------------+
^
|
+----------+----------+
| |
+------------+ +---------------------+
| WriteBatch | | WriteBatchWithIndex |
+------------+ +---------------------+
```
By switching our `BatchWritingStore` methods from `WriteBatch` to
`WriteBatchInterface`, we enable the use of `WriteBatchWithIndex` as
well.
* Improve error reporting for unknown batch type
We should be using an `IllegalStateException`, and we should log a
message informing the user that this is a bug.
This branch should be unreachable, as both of the possible
implementations of `WriteBatchInterface` are matched in the previous two
branches.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This patch bumps the next release version to 3.8.0-SNAPSHOT.
Following the Release Process, I created the 3.7 branch and am following the steps to bump these versions:
Modify the version in trunk to bump to the next one (eg. "0.10.1.0-SNAPSHOT") in the following files:
docs/js/templateData.js
gradle.properties
kafka-merge-pr.py
streams/quickstart/java/pom.xml
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
streams/quickstart/pom.xml
tests/kafkatest/__init__.py
In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link
This is because the constant in @ParameterizedTest
String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method
For existing tests which override the name, should work as is. The precedence rules are explained
name attribute in @ParameterizedTest, if present
value of the junit.jupiter.params.displayname.default configuration parameter, if present
DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest
Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names
Sample test run output
Before: [1] true link
After: testMetadataExpiry(boolean).false link
This commit is an extension of bdf6d46b41 which needed to reverted due to introduces test failures.
Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Part of KIP-714.
Add support to collect client instance id of the global consumer.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This reverts commit bdf6d46b41. We found out that this commit introduced flakiness in Streams' tests. We will revise it.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Part of KIP-714.
Adds support to expose main consumer client instance id.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Minor cleanup to make it easier to follow the restore listener logic. Currently, the KafkaStreams class tracks two restore listener fields: there is a non-final, nullable "globalRestoreListener" that holds the restore listener specified by the user (if any), and then there is a final "delegatingRestoreListener" that's used to encapsulate the null checks for the user-specified restore listener. It's a bit confusing to follow along with what each of these restore listener fields is doing and the relationship between them when they're on equal footing like this, when in reality they're more hierarchical and the DelegatingRestoreListener is actually a wrapper over the user-specified globalRestoreListener. The term "global" is also a bit misleading as it can get mixed up with global state stores, when it's really meant to be "global" in the sense that it applies to all state stores in the application.
It would be nice to just move the user listener completely inside the DelegatingRestoreListener class and then make that class static, as well as renaming the field to "userRestoreListener"
Reviewers: Matthias J. Sax <mjsax@apache.org>
Implementation for KIP-988, adds the new StandbyUpdateListener interface
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Colt McNealy <colt@littlehorse.io>
Relax non-null FK left-join requirement.
Testing Strategy: Inject extractor which returns null on first or second element.
Reviewers: Walker Carlson <wcarlson@apace.org>
- Part of KIP-714
- Add new configs and public API for Kafka Streams
- Implement support to get admin client instance id
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Apoorv Mittal <amittal@confluent.io>, Walker Carlson <wcarlson@confluent.io>
RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted does not wait to ensure that test-topic-A is deleted. The second assignment condition times out in 15sec.
We should wait for the topic to be deleted (default timeout = 30sec) and then check the assignment.
Reviewers: Walker Carlson <wcarlson@apache.org>
When opening RocksDB, we were checking for an error in
`RocksDBTimestampedStore` to detect if the `keyValueWithTimestamp` CF is
missing.
The `openRocksDB` method now supports any number of column families, not
just the extra one used by `RocksDBTimestampedStore`. We now check for
the existing column families _before_ opening the database, which allows
us to create any missing column families.
Supporting automatic creation of any number of missing column families
is a pre-requisite for KIP-892: Transactional StateStores.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
This pull request is an attempt to get what has started in #12524 to completion as part of the Streams project migration to Mockito.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The following race can happen in the state updater code path
Task is restoring, owned by state updater
We fall out of the consumer group, lose all partitions
We therefore register a "TaskManager.pendingUpdateAction", to CLOSE_DIRTY
We also register a "StateUpdater.taskAndAction" to remove the task
We get the same task reassigned. Since it's still owned by the state updater, we don't do much
The task completes restoration
The "StateUpdater.taskAndAction" to remove will be ignored, since it's already restored
Inside "handleRestoredTasksFromStateUpdater", we close the task dirty because of the pending update action
We now have the task assigned, but it's closed.
To fix this particular race, we cancel the "close" pending update action. Furthermore, since we may have made progress in other threads during the missed rebalance, we need to add the task back to the state updater, to at least check if we are still at the end of the changelog. Finally, it seems we do not need to close dirty here, it's enough to close clean when we lose the task, related to KAFKA-10532.
This should fix the flaky EOSIntegrationTest.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implements KIP-992.
Adds TimestampedKeyQuery and TimestampedRangeQuery (IQv2) for ts-ks-store, plus changes semantics of existing KeyQuery and RangeQuery if issues against a ts-kv-store, now unwrapping value-and-timestamp and only returning the plain value.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This interface provides a common supertype for `StreamThread` and
`DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892
to differentiate between "processing" threads and interactive query
threads.
This is needed because `DefaultTaskExecutor.TaskExecutorThread` is
`private`, so cannot be seen directly from `RocksDBStore`.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Introduce a dummy node connected to every other node and run Bellman-ford from the dummy node once instead of from every node in the graph.
Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <matthias@confluent.io>
Named topologies is a feature that is planned to be removed from AK with 4.0 and was never used via the public interface. It was used in a few versions of KSQL only, but was disabled there as well. While we do not want to remove it in 3.7 yet, we should disable flaky tests in that feature, that are disruptive to AK development.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implementation for KIP-954: support custom DSL store providers
Testing Strategy:
- Updated the topology tests to ensure that the configuration is picked up in the topology builder
- Manually built a Kafka Streams application using a customer DslStoreSuppliers class and verified that it was used
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang.wang.us@gmail.com>
The state updater waits on a condition variable if no tasks exist that need to be updated. The condition variable is wrapped by a loop to account for spurious wake-ups. The check whether updating tasks exist is done in the condition of the loop. Actually, the state updater thread can change whether updating tasks exists, but since the state updater thread is waiting for the condition variable the check for the existence of updating tasks will always return the same value as long as the state updater thread is waiting. Thus, the check only need to be done once before entering the loop.
This commit moves check before the loop making also the usage of mocks more robust since the processing becomes more deterministic.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
A few variables which aren't being used anymore but still exist. This commit removes those unused variables.
Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:
* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:
* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`
`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.
This task is part of the work to implement support for the new KIP-848 consumer group protocol.
Reviewers: Philip Nee <pnee@confluent.io>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This patch fixes the compilation error for JDK 21 introduced in https://github.com/apache/kafka/pull/14708.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
This is a follow up from #14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
When a task is corrupted, uncorrupted tasks are committed. That is also true for standby tasks. Committing standby tasks actually means that they are checkpointed.
When the state updater is enabled, standbys are owned by the state updater. The stream thread should not checkpoint them when handling corrupted tasks. That is not a big limitation since the state updater periodically checkpoints standbys anyway. Additionally, when handling corrupted tasks the important thing is to commit active running tasks to abort the transaction. Committing standby tasks do not abort any transaction.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in #14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...).
It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes)
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)
The key requirments got relaxed for the followinger streams dsl operator:
left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.
Reviewers: Walker Carlson <wcarlson@apache.org>
Part of KIP-985.
Updates JavaDocs for `RangeQuery` and `ReadOnlyKeyValueStore` with regard to ordering guarantees.
Updates Kafka Streams upgrade guide with KIP information.
Reviewer: Matthias J. Sax <matthias@confluent.io>
Trying to fix flakiness for the shouldInvokeUserDefinedGlobalStateRestoreListener test introduced in #14519.
Fixes are:
-Do not use static membership.
-Always close the 2nd KafkaStreams instance.
-Await for the Kafka Streams instance to transition to a RUNNING state before proceeding.
-Added logging for the StateRestoreListener implementation.
-Reduce restore consumer MAX_POLL_RECORDS.
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
After finishing restoration, we should only log the active tasks. Standby tasks are not part of restoration and it can be confusing to see them show up on this log message.
Reviewers: Matthias J. Sax <matthias@confluent.io>
- Introduce a new internal config flag to enable processing threads
- If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread
- All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled.
- Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free.
- Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply.
Enable some integration tests with processing threads enabled.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When we get a suspended task re-assigned in the eager rebalance protocol, we have to add the task back to the state updater so that it has a chance to catch up with its change log.
This was prevented by a check in Tasks, which disallows removing SUSPENDED tasks from the task registry. I couldn't find a reason why this must be an invariant of the task registry, so this weakens the check.
The error happens in the integration between TaskRegistry and TaskManager. However, this change anyway adds unit tests to more closely specify the intended behavior of the two modules.
Reviewers: Bruno Cadonna <bruno@confluent.io>
We embrace immutability and thus should return a new object instead of
`this`, similar to other config classed we use in the DSL.
Side JavaDocs cleanup for a bunch of classes.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
When tasks are found corrupted, Kafka Streams tries to commit
the non-corrupted tasks before closing and reviving the corrupted
active tasks. Besides active running tasks, Kafka Streams tries
to commit restoring active tasks and standby tasks. However,
restoring active tasks do not need to be committed since they
do not have offsets to commit and the current code does not
write a checkpoint. Furthermore, trying to commit restoring
active tasks with the state updater enabled results in the
following error:
java.lang.UnsupportedOperationException: This task is read-only
at org.apache.kafka.streams.processor.internals.ReadOnlyTask.commitNeeded(ReadOnlyTask.java:209)
...
since commitNeeded() is not a read-only method for active tasks.
In future, we can consider writing a checkpoint for active
restoring tasks in this situation. Additionally, we should
fix commitNeeded() in active tasks to be read-only.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Fixes logging for KafkaStreams#streamThreadLeaveConsumerGroup.
In order not to lose the trace of the whole exception, passing Exception e as a second argument, while message is pre-formatted and passed as string as a first argument. With this, we won't loose the stack trace of the exception.
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
With https://issues.apache.org/jira/browse/KAFKA-10575 StateRestoreListener#onRestoreSuspended was added. But local tests show that it is never called because DelegatingStateRestoreListener was not updated to call a new method
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>, Bruno Cadonna <cadonna@confluent.io>
Now that the implementation for the state updater is done, we can enable it by default.
This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager
Reviewers: Bruno Cadonna <bruno@confluent.io>
When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:
* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer
We introduce updateLags() and
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.
With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.
This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.
Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.
In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.
Reviewer: Bruno Cadonna <cadonna@apache.org>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
All block cache metrics are being multiplied by the total number of
column families. In a `RocksDBTimestampedStore`, we have 2 column
families (the default, and the timestamped values), which causes all
block cache metrics in these stores to become doubled.
The cause is that our metrics recorder uses `getAggregatedLongProperty`
to fetch block cache metrics. `getAggregatedLongProperty` queries the
property on each column family in the database, and sums the results.
Since we always configure all column families to share the same block
cache, that causes the same block cache to be queried multiple times for
its metrics, with the results added togehter, effectively multiplying
the real value by the total number of column families.
To fix this, we should simply use `getLongProperty`, which queries a
single column family (the default one). Since all column families share
the same block cache, querying just one of them will give us the correct
metrics for that shared block cache.
Note: the same block cache is shared among all column families of a store
irrespective of whether the user has configured a shared block cache
across multiple stores.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:
- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.
So in summary, we
- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
A mocked method is executed unexpectedly when we enable DEBUG
log level, leading to confusing test failures during debugging.
Since the log message itself seems useful, we adapt the test
to take the additional mocked method call into account).
Reviewer: Bruno Cadonna <cadonna@apache.org>
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Preliminary fix for KAFKA-15429 which updates StreamThread.completeShutdown to
catch-and-log errors from consumer.unsubscribe. Though this does not prevent
the exception, it does preserve the original exception that caused the stream
thread to exit.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Implement setting and clearing task timeouts, as well as changing the output on exceptions to make
it similar to the existing code path.
Reviewer: Walker Carlson <wcarlson@apache.org>
Minor fix to avoid creating unnecessary standby tasks, especially when these may be surprising or unexpected as in the case of an application with num.standby.replicas = 0 and warmup replicas disabled.
The "bug" here was introduced during the fix for an issue with cooperative rebalancing and in-memory stores. The fundamental problem is that in-memory stores cannot be unassigned from a consumer for any period, however temporary, without being closed and losing all the accumulated state. This caused some grief when the new HA task assignor would assign an active task to a node based on the readiness of the standby version of that task, but would have to remove the active task from the initial assignment so it could first be revoked from its previous owner, as per the cooperative rebalancing protocol. This temporary gap in any version of that task among the consumer's assignment for that one intermediate rebalance would end up causing the consumer to lose all state for it, in the case of in-memory stores.
To fix this, we simply began to place standby tasks on the intended recipient of an active task awaiting revocation by another consumer. However, the fix was a bit of an overreach, as we assigned these temporary standby tasks in all cases, regardless of whether there had previously been a standby version of that task. We can narrow this down without sacrificing any of the intended functionality by only assigning this kind of standby task where the consumer had previously owned some version of it that would otherwise potentially be lost.
Also breaks up some of the long log lines in the StreamsPartitionAssignor and expands the summary info while moving it all to the front of the line (following reports of missing info due to truncation of long log lines in larger applications)
assertThrows makes the verification of exceptions clearer and more intuitive, thus improving code readability compared to the annotation approach. It is considered a test smell in the research literature. One possible research is due to developers not keeping up to date with recent versions of testing frameworks.
All such patterns in streams have been refactored.
Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle and to init.
Usually all pending tasks to init are added to the state updater
in the same poll iteration that handles the assignment. However,
if during an initialization of a task a LockException the task
is re-added to the tasks to init and initialization is retried
in the next poll iteration.
A LockException might occur when a state directory is still locked
by another thread, when the rebalance just happened.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Currently, Kafka Streams only tries to purge records whose
offset are committed from a repartition topic when at
least one offset was committed in the current commit.
The coupling between committing some offsets and purging
records is not needed and might delay purging of records.
For example, if a in-flight call for purging records has not
completed yet when a commit happens, a new call
is not issued.
If then the earlier in-flight call for purging records
finally completes but the next commit does not commit any
offsets, Streams does not issue the call for purging records
whose offset were committed in the previous commit
because the purging call was still in-flight.
This change issues calls for purging records during any commit
if the purge interval passed, even if no offsets were committed
in the current commit.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
KIP-904 introduced a backward incompatible change that requires a 2-bounce rolling upgrade.
The new "3.4" upgrade config value is not recognized by `AssignorConfiguration` though and thus crashed Kafka Streams if use.
Reviewers: Farooq Qaiser <fqaiser94@gmail.com>, Bruno Cadonna <bruno@confluent.io>
Reusing an admin client across tests can cause false positives in leak checkers, so don't do it.
Reviewers: Divij Vaidya <diviv@amazon.com>, Matthias J. Sax <matthias@confluent.io>
Implements punctuation inside processing threads. The scheduler
algorithm checks if a task that is not assigned currently can
be punctuated, and returns it when a worker thread asks for the
next task to be processed. Then, the processing thread runs all
punctuations in the punctionation queue.
Piggy-backed: take TaskExecutionMetadata into account when
processing records.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Part of KIP-925.
- Add more tests for HighAvailabilityTaskAssignor
- Remove null and optional check for RackAwareTaskAssignor
- Pass rack aware assignor configs to getMainConsumerConfigs so that they can be picked up in rebalance protocol
- Change STATELESS_NON_OVERLAP_COST to 0. It was a mistake to be 1. Stateless tasks should be moved without this cost.
- Update of existing tests
Reviewers: Matthias J. Sax <matthias@confluent.io>
Change in response to KIP-941.
New PR due to merge issue.
Changes line 57 in the RangeQuery class file from:
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
}
to
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper));
}
Testing strategy:
Since null values can now be entered in RangeQuerys in order to receive full scans, I changed the logic defining query starting at line 1085 in IQv2StoreIntegrationTest.java from:
final RangeQuery<Integer, V> query;
if (lower.isPresent() && upper.isPresent()) {
query = RangeQuery.withRange(lower.get(), upper.get());
} else if (lower.isPresent()) {
query = RangeQuery.withLowerBound(lower.get());
} else if (upper.isPresent()) {
query = RangeQuery.withUpperBound(upper.get());
} else {
query = RangeQuery.withNoBounds();
}
to
query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
because different combinations of isPresent() in the bounds is no longer necessary.
Reviewers: John Roesler <vvcephei@apache.org>, Bill Bejeck <bbejeck@apache.org>
Part of KIP-915.
- Change TaskAssignor interface to take RackAwareTaskAssignor
- Integrate RackAwareTaskAssignor to StreamsPartitionAssignor and HighAvailabilityTaskAssignor
- Update HAAssignor tests
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <matthias@confluent.io>
Part of KIP-925.
- Add configs for rack aware assignor
- Update standby optimizer in RackAwareTaskAssignor to have more rounds
- Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests
Reviewers: Matthias J. Sax <matthias@confluent.io>
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle.
There are situations in which a stream thread might only have
standby tasks that are recycled to active task after a rebalance.
In such situations, the stream thread might be faster in checking
active tasks in restoration then the state updater removing the
standby task to recycle from the state updater. If that happens
the stream thread changes to RUNNING although it should wait until
the standby tasks are recycled to active tasks and restored.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Part of KIP-925.
For rack aware standby task assignment, we can either use the already existing "rack tags" or as a fall-back the newly added "rack.id". This PR unifies both without the need to change the actual standby task assignment logic.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Check the history retention of the ktable of the grace period join.
Reviewers: Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <cadonna@apache.org>
This PR adds CommandDefaultOptions usage like in the other joptsimple based tools. It also moves the associated unit test class from streams to tools module as discussed in #13127 (comment)
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
Catch any exceptions that escape the processing logic
inside TaskExecutors and record them in the TaskManager.
Make sure the TaskExecutor survives, but the task is
unassigned. Add a method to TaskManager to drain the
exceptions. The aim here is that the polling thread will
drain the exceptions to be able to execute the
uncaught exception handler, abort transactions, etc.
Reviewer: Bruno Cadonna <cadonna@apache.org>
With the state updater, the task manager needs also to look into the
tasks owned by the state updater when computing the sum of offsets
of the state. This sum of offsets is used by the high availability
assignor to assign warm-up replicas.
If the task manager does not take into account tasks in the
state updater, a warm-up replica will never report back that
the state for the corresponding task has caught up. Consequently,
the warm-up replica will never be dismissed and probing rebalances
will never end.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
It's good for us to add support for Java 20 in preparation for Java 21 - the next LTS.
Given that Scala 2.12 support has been deprecated, a Scala 2.12 variant is not included.
Also remove some branch builds that add load to the CI, but have
low value: JDK 8 & Scala 2.13 (JDK 8 support has been deprecated),
JDK 11 & Scala 2.12 (Scala 2.12 support has been deprecated) and
JDK 17 & Scala 2.12 (Scala 2.12 support has been deprecated).
A newer version of Mockito (4.9.0 -> 4.11.0) is required for Java 20 support, but we
only use it with Scala 2.13+ since it causes compilation errors with Scala 2.12. Similarly,
we upgrade easymock when the Java version is 16 or newer as it's incompatible
with powermock (which doesn't support Java 16 or newer).
Filed KAFKA-15117 for a test that fails with Java 20 (SslTransportLayerTest.testValidEndpointIdentificationCN).
Finally, fixed some lossy conversions that were added after #13582 was submitted.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This PR adds the interface for grace period to the Joined object as well as uses the buffer. The majority of it is tests and moving some of the existing join logic.
Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)
Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
An example of the warning:
> warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy
There should be no change in behavior as part of these changes - runtime logic ensured
we didn't run into issues due to the lossy conversions.
Reviewers: Divij Vaidya <diviv@amazon.com>
Replaces EasyMock and PowerMock with Mockito in TimeOrderedCachingPersistentWindowStoreTest.
Reviewers: Divij Vaidya <diviv@amazon.com>, Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
KAFKA-14936: Add On Disk Time Ordered Buffer
Add a time ordered key-value buffer stored on disk and implemented using RocksDBTimeOrderedKeyValueSegmentedBytesStore.
This will be used in the stream buffered for joins with a grace period.
Reviewers: Bruno Cadonna <cadonna@confluent.io> Victoria Xia <victoria.xia@confluent.io>
This pull requests migrates the Admin mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.
Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Fixes a bug regarding the state updater where tasks that experience corruption
during restoration are passed from the state updater to the stream thread
for closing and reviving but then the revived tasks are not re-added to
the state updater.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Change the TimeOrderedKeyValueBuffer to take three types to include the store type so that it can be used for non change<V> operations as well.
Reviewers: Victoria Xia<victoria.xia@confluent.io> , Gabriel Gama <>
This patch rewrite `MockTime` in Java and moves it to `server-common` module. This is a prerequisite to move `MockTimer` later on to `server-common` as well.
Reviewers: David Arthur <mumrah@gmail.com>
Replace usage of Cluster in StreamsMetadataState with Map<String, List>. Update StreamsPartitionAssignor#onAssignment method to pass existing Map<TopicPartition, PartitionInfo> instead of fake Cluster object.
Behavior remains the same; updated existing unit tests accordingly.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Following method was deprecated in 2.5 and was removed in 3.0.0.
// KafkaStreams.java
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);
However, many comments reference the removed method which can be confusing in generated JavaDocs. The code in java doc comments has been changed to reflect the new method, store(final StoreQueryParameters<T> storeQueryParameters).
Also, minor changes to variable names in java doc to be context specific.
Reviewer: Bruno Cadonna <cadonna@apache.org>
This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest from EasyMock to Mockito
The change is restricted to a single mock to minimize the scope and make it easier for review.
Please see two examples that follow the same pattern below:
#13529#13621
Reviewers: Divij Vaidya <diviv@amazon.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Christo Lolov <lolovc@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The VersionedKeyValueToBytesStoreAdapter#isOpen API accidentally returns the value of inner.persistent() when it should be returning inner.isOpen()
Reviewers: Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Victoria Xia <victoria.xia@confluent.io>
1. Migrates topology builder mock in TaskManagerTest to mockito.
2. Replaces the unit test to verify if subscribed partitions are added
to topology metadata.
3. Modifies signatures of methods for adding subscribed partitions to
topology metadata to use sets instead of lists. This makes the
intent of the methods clearer and makes the tests more portable.
Reviewers: Christo Lolov <lolovc@amazon.com>, Matthias J. Sax <mjsax@apache.org>
KIP-904 had the goal in mind to save space when encoding the size on a byte array. However, using UINT32 does not achieve this goal. This PR changes the encoding to VARINT instead.
Reviewers: Victoria Xia <victoria.xia@confluent.io>, Farooq Qaiser <fqaiser94@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Stream-stream outer join, uses a "shared time tracker" to track stream-time progress for left and right input in a single place. This time tracker is incorrectly shared across tasks.
This PR introduces a supplier to create a "shared time tracker" object per task, to be shared between the left and right join processors.
Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Walker Carlson <wcarlson@confluent.io>
This PR fixes a bug in the table-table join handling of out-of-order records in versioned tables where if the latest value for a particular key is a tombstone, by using the isLatest value from the Change object instead of calling get(key) on the state store to fetch timestamps to compare against. As part of this fix, this PR also updates table-table joins to determine whether upstream tables are versioned by using the GraphNode mechanism, instead of checking the table's value getter.
Part of KIP-914.
Reviewer: Matthias J. Sax <matthias@confluent.io>
This reverts commit d9b139220e.
KIP-878 implementation did not make any progress, so we need to revert
the public API changes which are not functional right now.
Reviewers: Bill Bejeck <bill@confluent.io>
We incorrectly assumed, that `consumer.position()` should always be
served by the consumer locally set position.
However, within `commitNeeded()` we check if first `if(commitNeeded)`
and thus go into the else only if we have not processed data (otherwise,
`commitNeeded` would be true). For this reason, we actually don't know
if the consumer has a valid position or not.
We should just swallow a timeout if the consumer cannot get the position
from the broker, and try the next partition. If any position advances, we
can return true, and if we timeout for all partitions we can return
false.
Reviewers: Michal Cabak (@miccab), John Roesler <john@confluent.io>, Guozhang Wang <guozhand@confluent.io>
Using versioned-stores for global-KTables is not allowed, because a
global-table is bootstrapped on startup, and a stream-globalTable join
does not support temporal semantics.
Furthermore, `suppress()` does not support temporal semantics and thus
cannot be applied to an versioned-KTable.
This PR disallows both use-cases explicitely.
Part of KIP-914.
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Victoria Xia <victoria.xia@confluent.io>
Implements KIP-399.
Extends ProductionExceptionHandler to handle serialization errors, and to allow users to continue processing and dropping the corresponding record on the floor.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
KIP-914 introduced a new boolean isLatest into Change to indicate whether a change update represents the latest for the key. Even though Change is serialized into the table repartition topic, the new boolean does not need to be serialized in, because the table repartition map processor performs an optimization to drop records for which isLatest = false. If not for this optimization, the downstream table aggregate would have to drop such records instead, and isLatest would need to be serialized into the repartition topic.
In light of the possibility that isLatest may need to be serialized into the repartition topic in the future, e.g., if other downstream processors are added which need to distinguish between records for which isLatest = true vs isLatest = false, this PR reserves repartition topic formats which include isLatest. Reserving these formats now comes at no additional cost to users since a rolling bounce is already required for the upcoming release due to KIP-904. If we don't reserve them now and instead have to add them later, then another bounce would be required at that time. Reserving formats is cheap, so we choose to do it now.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-914.
This PR adds an additional boolean isLatest into Change which specifies whether the new value is the latest for its key. For un-versioned stores, isLatest is always true. For versioned stores, isLatest is true if the value has the latest timestamp seen for the key, else false. This boolean will be used by processors such as the table repartition map processor to determine when a record is out-of-order and should be dropped (when processing a versioned table). This PR updates the table repartition map processor accordingly, and also adds test coverage for table filter.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-914.
This PR updates the return type of VersionedKeyValueStore#put(...) from void to long, where the long is the validTo timestamp of the newly put record, with two special values to indicate either that no such timestamp exists (because the record is the latest for its key) or that the put did not take place (because grace period has elapsed).
As part of making this change, VersionedBytesStore introduces its own put(key, value, timestamp) method to avoid method signature conflicts with the existing put(key, value) method from KeyValueStore<Bytes, byte[]> which has void return type. As a result, the previously added NullableValueAndTimestampSerde class is no longer needed so it's also been removed in this PR as cleanup.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR adds a method into GraphNode to assist in tracking whether tables are materialized as versioned or unversioned stores. This is needed in order to allow processors which have different behavior on versioned vs unversioned tables to use the correct semantics. Part of KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR updates primary-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR updates the stream-table join processors, including both KStream-KTable and KStream-GlobalKTable joins, to perform timestamped lookups when the (global) table is versioned, as specified in KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>
1. I've verified and made sure the only case that task would be null and not stream task would be in testing code only, with pausing / resuming topologies; I've revamped the restoration recording func, mainly to make just one method on the Task interface, to make sure we would never get task == null and do not need to cast to StreamTask.
2. Use numRecords directly to avoid calling records.size() that triggers concurrent modifications.
3. Rewrite the TaskMetricsTest to not use the removed impl functions.
4. Found an issue while fixing 1) above, turns out it's related to pausing tasks: if the tasks are paused due to instance / named-topologies are paused while they need restoration, the restoration would never finish, and hence the instance's state would not transit to RUNNING. Similarly, if user paused just one of the named-topology right at the beginning, since the state would not transit to RUNNING, every tasks across all named-topologies would not make progress. We keep the behavior as is to be consistent with and without state-updater.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
In preparation for updating DSL join processors to have updated semantics when versioned stores are used (cf KIP-914), this PR adds test coverage for out-of-order data in joins to the existing integration tests for stream-table joins and primary-key table-table joins. Follow-up PRs will build on top of this change by adding new tests for versioned stores, and the out-of-order data will produce different results in those settings.
Reviewers: Matthias J. Sax <matthias@confluent.io>
In preparation for updating DSL processors to use versioned stores (cf KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and get(key, asOfTimestamp) and updates all existing implementations accordingly.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Adds an integration test for the manual upgrade scenario to upgrade a non-versioned store to a versioned store. The procedure is outlined in KIP-889 and also in the docs.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Under at-least-once, we want to ensure checkpointing the progress after completing the restoration to prevent losing the progress and needing to restore from scratch.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This fix is inspired by #12540.
1. Added a clearCache function for CachedStateStore, which would be triggered upon recycling a state manager.
2. Added the integration test inherited from #12540 .
3. Improved some log4j entries.
4. Found and fixed a minor issue with log4j prefix.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Until this PR, all the code added for KIP-889 for introducing versioned stores to Kafka Streams has been accessible from internal packages only. This PR exposes the stores via public Stores.java methods, and also updates the TopologyTestDriver.
Reviewers: Matthias J. Sax <matthias@confluent.io>
1. Fix the StateUpdater shutdown procedure: a) in shutdown, we first set the running flag, then notify the condition; b) in the thread's waitIfAllChangelogsCompletelyRead block, we collapse the if condition together with the while condition so that we always check all four conditions once the thread is notified inside the while loop. As a result, shutdown procedure would not involve any thread interruptions anymore.
2. Print fine-grained streams exception when list-offset fails, this is a byproduct of the debugging procedure but I think it's worth keeping since it has better operational visibilities.
3. Some nit logging improvements (including moving logger from the inner thread into the outer class to also add some more logging).
4. Re-enable state-updater in SmokeTestDriverIntegrationTest.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
The RocksDB-based versioned store implementation introduced in KIP-889 currently uses two physical RocksDB instances per store instance: one for the "latest value store" and another for the "segments store." This PR combines those two RocksDB instances into one by representing the latest value store as a special "reserved" segment within the segments store. This reserved segment has segment ID -1, is never expired, and is not included in the regular Segments methods for getting or creating segments, but is represented in the physical RocksDB instance the same way as any other segment.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The RocksDB-based implementation of versioned stores introduced via KIP-889 consists of a "latest value store" and separate (logical) "segments stores." A single put operation may need to modify multiple (two) segments, or both a segment and the latest value store, which opens the possibility to store inconsistencies if the first write succeeds while the later one fails. When this happens, Streams will error out, but the store still needs to be able to recover upon restart. This PR adds the necessary repair logic into RocksDBVersionedStore to effectively undo the earlier failed write when a store inconsistency is encountered.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Adds integration tests for the new versioned stores introduced in KIP-889.
This PR also contains a small bugfix for the restore record converter, required to get the tests above to pass: even though versioned stores are timestamped stores, we do not want to use the record converter for prepending timestamps when restoring a versioned store.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-889.
Prior to this PR, versioned stores always returned null for get(key, timestamp) calls where the timestamp has exceeded the store's history retention, even if the latest value for the key (i.e., the one returned from get(key)) satisfies the timestamp bound. This was an oversight from the earlier implementation -- get(key, timestamp) should still return a record in this situation since the record exists in the store. This PR updates both the javadocs and the implementation accordingly.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Sets the correct topic configs for changelog topics for versioned stores introduced in KIP-889. Changelog topics for versioned stores differ from those for non-versioned stores only in that min.compaction.lag.ms needs to be set in order to prevent version history from being compacted prematurely.
The value for min.compaction.lag.ms is equal to the store's history retention plus some buffer to account for the broker's use of wall-clock time in performing compactions. This buffer is analogous to the windowstore.changelog.additional.retention.ms value for window store changelog topic retention time, and uses the same default of 24 hours. In the future, we can propose a KIP to expose a config such as versionedstore.changelog.additional.compaction.lag.ms to allow users to tune this value.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-899.
AbstractSegments automatically calls the helper method to clean up expired segments as part of getOrCreateSegmentIfLive(). This works fine for windowed store implementations which call getOrCreateSegmentIfLive() exactly once per put() call, but is inefficient and difficult to reason about for the new RocksDBVersionedStore implementation (cf. #13188) which makes potentially multiple calls to getOrCreateSegmentIfLive() for different segments for a single put() call. This PR addresses this by refactoring the call to clean up expired segments out of getOrCreateSegmentIfLive(), opting to have the different segments implementations specify when cleanup should occur instead. After this PR, RocksDBVersionedStore only cleans up expired segments once per call to put().
Reviewers: Matthias J. Sax <matthias@confluent.io>
A privious change disabled strict stubbing for the `RocksDBMetricsRecorderTest`. To re-enable the behavior in JUnit-5, we need to pull in a new dependency in the `streams` gradle project.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR introduces VersionedKeyValueStoreBuilder for building the new versioned stores introduced in KIP-889, analogous to the existing TimestampedKeyValueStoreBuilder for building timestamped stores. This PR also updates the existing KTable store materializer class to materialize versioned stores in addition to timestamped stores. As part of this change, the materializer is renamed from TimestampedKeyValueStoreMaterializer to simply KeyValueStoreMaterializer.
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
Documentation only—Minor clarification on how max.warmup.replicas works; specifically, that one "warmup replica" corresponds to a Task that is restoring its state. Also clarifies how max.warmup.replicas interacts with probing.rebalance.interval.ms.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
As part of introducing versioned key-value stores in KIP-889, we want to lift the existing DSL restriction that KTable stores are always TimestampedKeyValueStores to allow for KTable stores which are VersionedKeyValueStores instead. This PR lifts this restriction by replacing raw usages of TimestampedKeyValueStore with a new KeyValueStoreWrapper which supports either TimestampedKeyValueStore or VersionedKeyValueStore.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Found a few flaky tests while reviewing another PR. The root cause seems to be with changing the return behavior of when in mockito. Fixed those without using reset and also bumped a couple debug log lines to info since they could be very helpful in debugging.
Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Introduces the metered store layer for the new versioned key-value store introduced in KIP-889. This outermost, metered store layer handles all serialization/deserialization from VersionedKeyValueStore to a bytes-representation (VersionedBytesStore) so that all inner stores may operate only with bytes types.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* Moved pausing-tasks logic out of the commit-interval loop to be on the top-level loop, similar to resuming tasks.
* Added thread-level restoration metrics.
* Related unit tests.
Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>, Matthias J. Sax <matthias@confluent.io>
Kafka Streams is supposed to handle TimeoutException during internal topic creation gracefully. This PR fixes the exception handling code to avoid crashing on an TimeoutException returned by the admin client.
Reviewer: Matthias J. Sax <matthias@confluent.io>, Colin Patrick McCabe <cmccabe@apache.org>, Alexandre Dupriez (@Hangleton)
Introduces the changelogging layer for the new versioned key-value store introduced in KIP-889. The changelogging layer operate on VersionedBytesStore rather than VersionedKeyValueStore so that the outermost metered store can serialize to bytes once and then all inner stores operate only with bytes types.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path.
* Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore
consumer before realizing that a newly added task was already in paused state when being added.
* Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition.
* Make sure that allTasks methods also return the tasks that are currently being restored.
* Enable PauseResumeIntegrationTest and upgrade it to JUnit5.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Guozhang Wang <wangguoz@gmail.com>
As part of introducing versioned key-value stores in KIP-889, we'd like a way to represent a versioned key-value store (VersionedKeyValueStore<Bytes, byte[]>) as a regular key-value store (KeyValueStore<Bytes, byte[]>) in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept Materialized<K, V, KeyValueStore<Bytes, byte[]>. This way, we do not need to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized type to accept versioned stores.
This PR introduces the new VersionedBytesStore extends KeyValueStore<Bytes, byte[]> interface for this purpose, along with the corresponding supplier (VersionedBytesStoreSupplier) and implementation (RocksDbVersionedKeyValueBytesStoreSupplier). The RocksDbVersionedKeyValueBytesStoreSupplier implementation leverages an adapter (VersionedKeyValueToBytesStoreAdapter) to assist in converting from VersionedKeyValueStore to VersionedBytesStore.
Reviewers: Matthias J. Sax <matthias@confluent.io>
I noticed this issue when tracing #12590.
StreamThread closes the consumer before changing state to DEAD. If the partition rebalance happens quickly, the other StreamThreads can't change KafkaStream state from REBALANCING to RUNNING since there is a PENDING_SHUTDOWN StreamThread
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Includes 2 requirements from the ticket:
* Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as")
* Sort the member ids (to help compare the membership and assignment across rebalances)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store).
Part of KIP-889.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Changes the versioned store semantics to define an explicit "grace period" property. Grace period will always be equal to the history retention, though in the future we could introduce a new KIP to expose options to configure grace period separately.
Part of KIP-889.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR builds on the new RocksDB-based versioned store implementation (see KIP-889) by adding code for restoring from changelog. The changelog topic format is the same as for regular timestamped key-value stores: record keys, values, and timestamps are stored in the Kafka message key, value, and timestamp, respectively. The code for actually writing to this changelog will come in a follow-up PR.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Introduces the VersionedKeyValueStore interface proposed in KIP-889, along with the RocksDB-based implementation of the interface. This PR includes fully functional put, get, get-with-timestamp, and delete operations, but does not include the ability to restore records from changelog or surrounding store layers (for metrics or writing to the changelog). Those pieces will come in follow-up PRs.
Reviewers: Matthias J. Sax <matthias@confluent.io>
1. The major fix: synchronize access to tasks inside task manager, this is a fix of a regression introduced in #12397
2. Clarify on func names of StreamThread that maybe triggered outside the StreamThread.
3. Minor cleanups.
Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>
1. Add the new API (default impl is empty) to StateRestoreListener.
2. Update related unit tests
Reviewers: Lucas Brutschy <lucasbru@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
Related to KAFKA-14059 and KAFKA-14132
* Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest.java
* Reset removed which was not needed
Reviewers: Divij Vaidya <diviv@amazon.com>, Guozhang Wang <wangguoz@gmail.com>
Part of KIP-889.
The KIP proposed the introduction of versioned key-value stores, as well as a RocksDB-based implementation. The RocksDB implementation will consist of a "latest value store" for storing the latest record version associated with each key, in addition to multiple "segment stores" to store older record versions. Within a segment store, multiple record versions for the same key will be combined into a single bytes array "value" associated with the key and stored to RocksDB.
This PR introduces the utility class that will be used to manage the value format of these segment stores, i.e., how multiple record versions for the same key will be combined into a single bytes array "value." Follow-up PRs will introduce the versioned store implementation itself (which calls heavily upon this utility class).
Reviewers: Matthias J. Sax <matthias@confluent.io>
Make sure no scaladoc warnings are emitted from the streams-scala project build.
We cannot fully fix all scaladoc warnings due to limitations of the scaladoc tool,
so this is a best-effort attempt at fixing as many warnings as possible. We also
disable one problematic class of scaladoc wornings (link errors) in the gradle build.
The causes of existing warnings are that we link to java members from scaladoc, which
is not possible, or we fail to disambiguate some members.
The broad rule applied in the changes is
- For links to Java members such as [[StateStore]], we use the fully qualified name in a code tag
to make manual link resolution via a search engine easy.
- For some common terms that are also linked to Java members, like [[Serde]], we omit the link.
- We disambiguate where possible.
- In the special case of @throws declarations with Java Exceptions, we do not seem to be able
to avoid the warning altogther.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
This PR refactors how the list of open iterators for RocksDB stores is managed. Prior to this PR, the `openIterators` list was passed into the constructor for the iterators themselves, allowing `RocksDbIterator.close()` to remove the iterator from the `openIterators` list. After this PR, the iterators themselves will not know about lists of open iterators. Instead, a generic close callback is exposed, and it's the responsibility of the store that creates a new iterator to set the callback on the iterator, to ensure that closing an iterator removes the iterator from the list of open iterators.
This refactor is desirable because it enables more flexible iterator lifecycle management. Building on top of this, RocksDBStore is updated with an option to allow the user (i.e., the caller of methods such as `range()` and `prefixScan()` which return iterators) to pass a custom `openIterators` list for the new iterator to be stored in. This will allow for a new Segments implementation where multiple Segments can share the same RocksDBStore instance, while having each Segment manage its own open iterators.
Part of KIP-889.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR adds a safe-guard for divide-by-zero. While `totalCapacity` can never be zero, an explicit error message is desirable.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
I immediately saw a failure with stateUpdaterEnabled = true after disabling the false parameter, which suggests the problem actually does lie in the state updater itself and not the act of parametrization of the test. To verify this theory, and help stabilize the 3.4 release branch, let's try one more test by swapping out the true build in favor of the false one. If the listOffsets requests stop failing and causing this integration test to hit the global timeout as is currently happening at such a high rate, then we have pretty good evidence pointing at the state updater and should be able to debug things more easily from there.
After getting in a few builds to see whether the flakiness subsides, we should merge this PR to re-enable both parameters going forward: https://github.com/apache/kafka/pull/13155
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Need to get a clean build for 3.4 and this test has been extremely flaky. I'm looking into the failure as well, and want to pinpoint whether it's the true build that's broken or it's the parameterization itself causing this -- thus, let's start by temporarily disabling the false parameter first.
See KAFKA-14533 for more details
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The timeouts used for starting up Streams and waiting for the RUNNING state are all over the place across our integration tests, with some as low as 15s (which are unsurprisingly rather flaky). We use 60s as the default timeout for other APIs in the IntegrationTestUtils so we should do the same for #startApplicationAndWaitUntilRunning
I also noticed that we have several versions of that exact API in StreamsTestUtils, so I migrated everyone over to the IntegrationTestUtils#startApplicationAndWaitUntilRunning and added a few overloads for ease of use, including one for single KafkaStreams apps and one for using the default timeout
Reviewers: Matthias J. Sax <mjsax@apache.org>
While working on the 3.4 release I noticed we've built up an embarrassingly long list of warnings within the Streams javadocs. It's unavoidable for some links to break as the source code changes, but let's reset back to a good state before the list gets even longer
Reviewers: Matthias J. Sax <mjsax@apache.org>, Walker Carlson <wcarlson@confluent.io>
While wrapping the caught exception into a custom one, information about the caught
exception is being lost, including information about the stack trace of the exception.
When re-throwing an exception, we either include the original exception or the relevant
information is added to the exception message.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>, Matthew de Detrich <mdedetrich@gmail.com>
In the new state restoration code, the state updater needs to be checked regularly
by the main thread to transfer ownership of tasks back to the main thread once the
state of the task is restored. The more often we check this, the faster we can
start processing the tasks.
Currently, we only check the state updater once in every loop iteration of the state
updater. And while we couldn't observe this to be strictly not often enough, we can
increase the number of checks easily by moving the check inside the inner processing
loop. This would mean that once we have iterated over `numIterations` records, we can
already start processing tasks that have finished restoration in the meantime.
Reviewer: Bruno Cadonna <cadonna@apache.org>
This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14003. It is the second of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The three pom files for the Streams quickstart also need to bump their versions after a branch cut. For some reason these are included (late) in the Release Process guide, but are missing from the list of what to update when bumping the version in gradle.properties. This commit adds the missing files to this list to help future RMs locate all the required version changes
Reviewers: Ismael Juma <ismael@juma.me.uk>
Additional notable changes to fix multiple dependency ordering issues:
* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
`ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`
Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
Escape the `>` character in javadoc
Escape the `$` character when part of `${}` in scaladoc as this is the way to reference a variable
Reviewers: Matthias J. Sax <matthias@confluent.io>
Various tests in the streams park were leaking native memory.
Most tests were fixed by closing the corresponding rocksdb resource.
I tested that the corresponding leak is gone by using a previous rocksdb
release with finalizers and checking if the finalizers would be called at some
point.
Reviewer: Bruno Cadonna <cadonna@apache.org>
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores and StoreQueryIntegrationTest#shouldQueryOnlyActivePartitionStoresByDefault has a logic to find active partitions by doing a modulo with 2 and comparing the remainder. This can break when a new test is added and since Junit chooses an arbitrary order to run the tests, modulo checks can fail. This PR tries to make it deterministic.
Also, this PR uses Junit5 annotations so that the cluster and input topic can be setup/destroyed once.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The call to `interrupt` on the state updater thread during shutdown
could interrupt the thread while writing the checkpoint file. This
can cause a failure to write the checkpoint file and a misleading
stack trace in the logs.
Reviewer: Bruno Cadonna <cadonna@apache.org>
This PR updates StateQueryResult.getOnlyPartitionResult() to not throw an IllegaArgumentException when there are 0 query results.
Added a test that will fail without this patch
Reviewers: John Roesler<vvcephei@apache.org>
Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
There is unwanted logging introduced by #12803 as pointed out in this comment: #12803 (comment). This PR removes it.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored.
That seemed to have been because of the way StreamsConfig was being initialised so any new test would have used the same names. Because of which the second test never got to the desired state. With this PR, every test gets a unique app name which seems to have fixed the issue. Also, a couple of cosmetic changes
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
First PR for KIP-878: Internal Topic Autoscaling for Kafka Streams
Introduces two new configs related to autoscaling in Streams: a feature flag and retry timeout. This PR just adds the configs and gets them passed through to the Streams assignor where they'll ultimately be needed/used
Reviewers: Bill Bejeck <bill@confluent.io>, Walker Carlson <wcarlson@confluent.io>
RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.
I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Christo Lolov <lolovc@amazon.com>
As a result of "14260: InMemoryKeyValueStore iterator still throws ConcurrentModificationException", I'm adding synchronized to prefixScan as an alternative to going back to the ConcurrentSkipList.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Optimization of `ThreadCache`. The original implementation showed significant slow-down when many caches were registered.
`sizeBytes` was called at least once, and potentially many times
in every `put` and was linear in the number of caches (= number of
state stores, so typically proportional to number of tasks). That
means, with every additional task, every put gets a little slower.
This was confirmed experimentally.
In this change, we modify the implementation of `ThreadCache` to
keep track of the total size in bytes. To be independent of the
concrete implementation of the underlying cache, we update the size
by subtracting the old and adding the new size of the cache before
and after every modifying operation. For this we acquire the object
monitor of the cache, but since all modifying operations on the caches
are synchronized already, this should not cause extra overhead.
This change also fixes a `ConcurrentModificationException` that could
be thrown in a race between `sizeBytes` and `getOrCreate`.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR implements KIP-837 which enhances StreamPartitioner to multicast records.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, YEONCHEOL JANG
SmokeTestDriverIntegrationTest.java can be flaky because a NullPointerException prevents the retry mechanism that is added to prevent flakiness for this test. This change, prevents the NullPointerException and hence, allows the test to retry itself.
Reviewers: Luke Chen <showuon@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
In this change, we enable backing off when the state directory
is still locked during initialization of a task. When the state
directory is locked, the task is reinserted into the
initialization queue. We will reattempt to acquire the lock
after the next round of polling.
Tested through a new unit test.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The original implementation of the state updater could not
handle double rebalances within one poll phase correctly,
because it could create tasks more than once if they hadn't
finished initialization yet.
In a55071a, we
moved initialization to the state updater to fix this. However,
with more testing, I found out that this implementation has
it's problems as well: There are problems with locking the
state directory (state updater acquired the lock to the state
directory, so the main thread wouldn't be able to clear the
state directory when closing the task), and benchmarks also
show that this can lead to useless work (tasks are being
initialized, although they will be taken from the thread soon
after in a follow-up rebalance).
In this PR, I propose to revert the original change, and fix
the original problem in a much simpler way: When we
receive an assignment, we simply clear out the
list of tasks pending initialization. This way, no double
tasks instantiations can happen.
The change was tested in benchmarks, system tests,
and the existing unit & integration tests. We also add
the state updater to the smoke integration test, which
triggered the double task instantiations before.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Batch 2 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Matthew de Detrich <matthew.dedetrich@aiven.io>, Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org
Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions.
Reviewers: John Roesler <vvcephei@apache.org>
While restoring a batch of records, RocksDBStore was iterating the ConsumerRecords, building a list of KeyValues, and then iterating that list of KeyValues to add them to the RocksDB batch.
Simply adding the key and value directly to the RocksDB batch prevents this unnecessary second iteration, and the creation of itermediate KeyValue objects, improving the performance of state restoration, and reducing unnecessary object allocation.
This also simplifies the API of RocksDBAccessor, as prepareBatchForRestore is no longer needed.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
With the addition of the new Processor API the newly added FixedKeyProcessorNodeFactory extends the ProcessorNodeFactory class. The ProcessorNodeFactory had a private field Set<String> stateStoreNames initialized to an empty see. The FixedKeyProcessorNodeFactory also had a private field Set<String> stateStoreNames.
When executing InternalTopologyBuilder.build executing the buildProcessorNode method passed any node factory as ProcessorNodeFactory and the method references the stateStoreNames field, it's pointing to the superclass field, which is empty so the corresponding StoreBuilder(s) are never added - causing NPE in the topology.
This PR makes the field protected on the ProcessorNodeFactory class so FixedKeyProcessorNodeFactory inherits it.
The added test fails without this change.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
The state updater code path puts tasks into an
"initialization queue", with created, but not initialized tasks.
These are later, during the event-loop, initialized and added
to the state updater. This might lead to losing track of those
tasks - in particular it is possible to create
tasks twice, if we do not go once around `runLoop` to initialize
the task. This leads to `IllegalStateExceptions`.
By handing the task to the state updater immediately and let the
state updater initialize the task, we can fulfil our promise to
preserve the invariant "every task is owned by either the task
registry or the state updater".
Reviewer: Bruno Cadonna <cadonna@apache.org>
Add a new #transactionInFlight API to the StreamsProducer to expose the flag of the same name, then check whether there is an open transaction when we determine whether or not to perform a commit in TaskExecutor. This is to avoid unnecessarily dropping out of the group on transaction timeout in the case a transaction was begun outside of regular processing, eg when a punctuator forwards records but there are no newly consumer records and thus no new offsets to commit.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Users have been seeing a large number of these error messages being logged by the RecordCollectorImpl:
Unable to records bytes produced to topic XXX by sink node YYY as the node is not recognized.
It seems like we try to save all known sink nodes when the record collector is constructed, by we do so by going through the known sink topics which means we could miss some nodes, for example if dynamic topic routing is used. Previously we were logging an error and would skip recording the metric if we tried to send a record from a sink node it didn't recognize, but there's not really any reason to have been tracking the sensors by node in the first place -- we can just track the actual sink topics themselves.
Reviewers: John Roesler <vvcephei@apache.org>, Christo Lolov <lolovc@amazon.com>
RestoreIntegrationTest used polling to determine if a rebalance
happens on one client, but if the rebalance would happen too quickly,
the polling would not pick it up and the check would time out.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The state updater code path introduced allocation and synchronization
overhead by performing relatively heavy operations in every iteration of
the StreamThread loop. This includes various allocations and acquiring
locks for handling `removedTasks` and `failedTasks`, even if the
corresponding queues are empty.
This change introduces `hasRemovedTasks` and
`hasExceptionsAndFailedTasks` in the `StateUpdater` interface that
can be used to skip over any allocation or synchronization. The new
methods do not require synchronization or memory allocation.
This change increases throughput by ~15% in one benchmark.
We extend existing unit tests to cover the slightly modified
behavior.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The streams upgrade system inserted FK join code for every version of the
the StreamsUpgradeTest except for the latest. Also, the original code
never switched on the `test.run_fk_join` flag for the target version of
the upgrade.
The effect was that FK join upgrades were not tested at all, since
no FK join code was executed after the bounce in the system test.
We introduce `extra_properties` in the system tests, that can be used
to pass any property to the upgrade driver, which is supposed to be
reused by system tests for switching on and off flags (e.g. for the
state restoration code).
Reviewers: Alex Sorokoumov <asorokoumov@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
The state updater can enter a busy polling loop if it
only updates standby tasks. We need to use the user-provided
poll-time to update always when using the state updater, since
the only other place where the state update blocks
(inside `waitIfAllChangelogsCompletelyRead`) is also
not blocking if there is at least one standby task.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
Improves logs withing Streams by replacing timestamps to date instances to improve readability.
Approach - Adds a function within common.utils.Utils to convert a given long timestamp to a date-time string with the same format used by Kafka's logger.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The ChangelogReader starts of in `ACTIVE_RESTORING` state, and
then goes to `STANDBY_RESTORING` when changelogs from standby
tasks are added. When the last standby changelogs are removed,
it remained in `STANDBY_RESTORING`, which means that an empty
ChangelogReader could be in either `ACTIVE_RESTORING` or
`STANDBY_RESTORING` depending on the exact sequence of
add/remove operations. This could lead the state updater into
an illegal state. Instead of changing the state updater,
I propose to stengthen the state invariant of the
`ChangelogReader` slightly: it should always be in
`ACTIVE_RESTORING` state, when empty.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
State stores are initialized from the StreamThread even when
the state updater thread is enabled. However, we were missing
the corresponding handling of exceptions when thrown directly
during the initialization. In particular, TaskCorruptedException
would directly fall through to runLoop, and the task
would fall out of book-keeping, since the exception is thrown
when neither the StreamThread nor the StateUpdater is owning
the task.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
This PR build on top of #11017. I have added the previous author's comment in this PR for attribution. I have also addressed the pending comments from @chia7712 in this PR.
Notes to help the reviewer:
Mockito has mockStatic method which is equivalent to PowerMock's method.
When we run the tests using @RunWith(MockitoJUnitRunner.StrictStubs.class) Mockito performs a verify() for all stubs that are mentioned, hence, there is no need to explicitly verify the stubs (unless you want to verify the number of times etc.). Note that this does not work for static mocks.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Walker Carlson <wcarlson@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Quickstart examples didn't produce any console output, since it was missing a logger implementation in the classpath.
Also some minor cleanups.
Tested by creating a test project and running the code.
PR implementing KIP-770 (#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
In two situations, the current code could transition the ChangelogReader
to UpdateStandby when already in that state, causing an IllegalStateException.
Namely these two cases are:
1. When only standby tasks are restoring and one of them crashes.
2. When only standby tasks are restoring and one of them is paused.
This change fixes both issues by only transitioning if the paused or
failed task is an active task.
Reviewer: Bruno Cadonna <cadonna@apache.org>
StreamThread in state PARTITIONS_ASSIGNED was running in
a busy loop until restoration is finished, stealing CPU
cycles from restoration.
Make sure the StreamThread uses poll_time when
state updater is enabled, and we are in state
PARTITIONS_ASSIGNED.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The original code path paused the main consumer for
all tasks before entering the restoration section
of the code, and then resumed all after restoration
has finished.
In the new state updater part of the code, tasks that
do not require restoration skip the restoration completely.
They remain with the TaskManger and are never transferred
to the StateUpdater, and thus are never resumed.
This change makes sure that tasks that remain with the
TaskManager are not paused.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
0. Add name to task executors.
1. DefaultTaskManager implementation, for interacting with the TaskExecutors and support add/remove/lock APIs.
2. Related unit tests.
0. Address comments from P1.
1. Add the DefaultTaskExecutor implementation class.
2. Related DefaultTaskExecutorTest.
Pending in future PRs: a) exception handling, primarily to send them to polling thread, b) light-weight task flushing procedure.
Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>
WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.
This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented
Reviewer: Luke Chen <showuon@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <mjsax@apache.org>
The interfaces (and their future impls) are added under the processor/internals/tasks package, to distinguish with the existing old classes:
1. TaskExecutor is the interface for a processor thread. It takes at most one task to process at a given time from the task manager. When being asked from the task manager to un-assign the current processing task, it will stop processing and give the task back to task manager.
2. TaskManager schedules all the active tasks to assign to TaskExecutors. Specifically: 1) when a task executor ask it for an unassigned task to process (assignNextTask), it will return the available task based on its scheduling algorithm. 2) when the task manager decides to commit (all) tasks, or when a rebalance event requires it to modify the maintained active tasks (via onAssignment), it will lock all the tasks that are going to be closed / committed, asking the TaskExecutor to give them back if they were being processed at the moment.
Reviewers: John Roesler <vvcephei@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Updating the input partitions of tasks also updates the mapping from
source nodes to input topics in the processor topology within the task.
The mapping is updated with the topics from the topology metadata.
The topology metadata does not prefix intermediate internal topics with
the application ID. Thus, if a standby task has input partitions from an
intermediate internal topic the update of the mapping in the processor
topology leads to an invalid topology exception during recycling of a
standby task to an active task when the input queues are created. This
is because the input topics in the processor topology and the input
partitions of the task do not match because the former miss the
application ID prefix.
The added verification to only update input partitions of standby tasks
if they really changed avoids the invalid topology exception if the
standby task only has input partitions from intermediate internal
topics since they should never change. If the standby task has input
partitions from intermediate internal topics and external topics
subscribed to via a regex pattern, the invalid topology exception
might still be triggered.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
When building streams metadata we want to build even if the host is empty as it is a common way to find the other host addresses
Reviewers: John Roesler <vvcephei@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Batch 4 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
When a topology is paused / resumed, we also need to pause / resume its corresponding tasks inside state updater.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
When the state updater only contains standby tasks and then a
standby task is removed, an IllegalStateException is thrown
because the changelog reader does not allow to switch to standby
updating mode more than once in a row.
This commit fixes this bug by checking that the removed task is
an active one before trying to switch to standby updating mode.
If the task to remove is a standby task then either we are already
in standby updating mode and we should not switch to it again or
we are not in standby updating mode which implies that there are
still active tasks that would prevent us to switch to standby
updating mode.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
This PR makes the following changes:
* Moves the only test in StateRestorationIntegrationTest into RestoreIntegrationTest
* Deletes StateRestorationIntegrationTest
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR is part of a series implementing the self-join rewriting. As part of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE which applies all optimization rules or a comma separated list of specific optimizations.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
Transforms the integration test that verifies restoration in a
parametrized test. The parametrized test runs once with
state updater enabled and once with state updater disabled.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Registering and unregistering the changelog topics in the
changelog reader outside of the state updater leads to
race conditions between the stream thread and the state
updater thread. Thus, this PR moves registering and
unregistering of changelog topics in the changelog
reader into the state updater if the state updater
is enabled.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
In the first attempt to handle revoked tasks in the state updater
we removed the revoked tasks from the state updater and added it to
the set of pending tasks to close cleanly. This is not correct since
a revoked task that is immediately reassigned to the same stream thread
would neither be re-added to the state updater nor be created again.
Also a revoked active task might be added to more than one bookkeeping
set in the tasks registry since it might still be returned from
stateUpdater.getTasks() after it was removed from the state updater.
The reason is that the removal from the state updater is done
asynchronously.
This PR solves this issue by introducing a new bookkeeping set
in the tasks registry to bookkeep revoked active tasks (actually
suspended active tasks).
Additionally this PR closes some testing holes around the modified
code.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
Why
Current set of integration tests leak files in the /tmp directory which makes it cumbersome if you don't restart the machine often.
Fix
Replace the usage of File.createTempFile with existing TestUtils.tempFile method across the test files. TestUtils.tempFile automatically performs a clean up of the temp files generated in /tmp/ folder.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <mlists@juma.me.uk>
Separates the code path for the new state updater from
the code path of the old restoration.
Ensures that with the state updater tasks are processed
before all tasks are running.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io
Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Dalibor Plavcic, Matthew de Detrich <mdedetrich@gmail.com>, Bruno Cadonna <cadonna@apache.org>
1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all.
2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from handleAssignment): 1) Task-migrated, throw all the way to stream-thread as handleTaskMigrated, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as handleCorruption. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from handleAssignment or not (when they are thrown from the state updater).
Reviewers: Bruno Cadonna <cadonna@apache.org>
Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
Reviewer: Luke Chen <showuon@gmail.com>
Changelogs are already unregistered when tasks are closed.
There is no need to also unregister them in the state
updater.
In future, when we will only have the state updater without
the old code path, we should consider registering and
unregistering the changelogs within the state updater.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
When the task manager is shutdown, the state updater should also
shutdown. After the shutdown of the state updater, the tasks
in its output queues should be closed.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Changes:
- Migrate to Mockito
- Add more assertive checks using verify
- Minor indentation fixes
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
The state updater removes its updating and paused task on shutdown.
The removed tasks are added to the output queue for removed tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Once the state updater restored an active task it puts it
into an output queue. The stream thread reads the restored
active task from the output queue and after it verified
that the task is still owned by the stream thread it transits
it to RUNNING.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug.
Reviewers: John Roesler <vvcephei@apache.org>
Compilation is failing after these two commits:
```
> Task :streams:compileJava
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
^
symbol: method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks
1 error
```
Also here:
```
[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
[2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in class TaskManagerTest
[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
```
This patch reverts them.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. Within the tryCompleteRestore function of the thread, try to drain the removed tasks from state updater and handle accordingly: 1) for recycle, 2) for closure, 3) for update input partitions.
2. Catch up on some unit test coverage from previous PRs.
3. Some minor cleanups around exception handling.
Reviewers: Bruno Cadonna <cadonna@apache.org>
We recently added a builder to create task mocks for unit
tests. This PR adds the functionality to add input partitions
to task mocks when the builder is used.
Reviewers: Walker Carlson <wcarlson@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@apache.org>
The state updater exposes tasks that are in restoration
to the stream thread. To ensure that the stream thread
only accesses the tasks to read from the tasks without
modifying any internal state, this PR introduces a
read-only task that throws an exception if the caller
tries to modify the internal state of a task.
This PR also returns read-only tasks from
DefaultStateUpdater#getTasks().
Reviewer: Guozhang Wang <wangguoz@gmail.com>
While working on KAFKA-13877, I feel it's an overkill to introduce the whole test class as an integration test, since all we need is to just test the assignor itself which could be a unit test. Running this suite with 9+ instances takes long time and is still vulnerable to all kinds of timing based flakiness. A better choice is to reduce it as a unit test, similar to HighAvailabilityStreamsPartitionAssignorTest that just test the behavior of the assignor itself, rather than creating many instances hence depend on various timing bombs to not explode.
Since we mock everything, there's no flakiness anymore. Plus we greatly reduced the test runtime (on my local machine, the old integration takes about 35 secs to run the whole suite, while the new one take 20ms on average).
Reviewers: Divij Vaidya <diviv@amazon.com>, Dalibor Plavcic
In the current test, we check for tag distribution immediately after everyone is on the running state, however due to the fact of the follow-up rebalances, "everyone is now in running state" does not mean that the cluster is now stable. In fact, a follow-up rebalance may occur, upon which the local thread metadata would return empty which would cause the distribution verifier to fail.
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>
When the migration of the Streams project to JUnit 5 started with PR #12285, we discovered that the migrated tests were not run by the PR builds. This PR ensures that Streams' tests that are written in JUnit 4 and JUnit 5 are run in the PR builds.
Co-authored-by: Divij Vaidya <diviv@amazon.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Bookkeeps tasks to be recycled, closed, and updated during handling of the assignment. The bookkeeping is needed for integrating the state updater.
These change is hidden behind internal config STATE_UPDATER_ENABLED. If the config is false Streams should not use the state updater and behave as usual.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. Consolidate the task recycle procedure into a single function within the task. The current procedure now becomes: a) task.recycleStateAndConvert, at end of it the task is in closed while its stateManager is retained, and the manager type has been converted; 2) create the new task with old task's fields and the stateManager inside the creators.
2. Move the task execution related metadata into the corresponding TaskExecutionMetadata class, including the task idle related metadata (e.g. successfully processed tasks); reduce the number of params needed for TaskExecutor as well as Tasks.
3. Move the task execution related fields (embedded producer and consumer) and task creators out of Tasks and migrated into TaskManager. Now the Tasks is only a bookkeeping place without any task mutation logic.
4. When adding tests, I realized that we should not add task to state updater right after creation, since it was not initialized yet, while state updater would validate that the task's state is already restoring / running. So I updated that logic while adding unit tests.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This PR introduces an internal config to enable the state updater. If the state updater is enabled newly created tasks are added to the state updater. Additionally, this PR introduces a builder for mocks for tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14001. It is the first of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
- used static memberId was incorrect
- need to remove all threads/members from the group
- need to use admit client correctly
Add test to verify fixes.
Reviewers: Matthias J. Sax <matthias@confluent.io>
In order to integrate with the state updater, we would need to refactor the TaskManager and Task interfaces. This PR achieved the following purposes:
Separate active and standby tasks in the Tasks placeholder, plus adding pendingActiveTasks and pendingStandbyTasks into Tasks. The exposed active/standby tasks from the Tasks set would only be mutated by a single thread, and the pending tasks hold for those tasks that are assigned but cannot be actively managed yet. For now they include two scenarios: a) tasks from unknown sub-topologies and hence cannot be initialized, b) tasks that are pending for being recycled from active to standby and vice versa. Note case b) would be added in a follow-up PR.
Extract any logic that mutates a task out of the Tasks / TaskCreators. Tasks should only be a place for maintaining the set of tasks, but not for manipulations of a task; and TaskCreators should only be used for creating the tasks, but not for anything else. These logic are all migrated into TaskManger.
While doing 2) I noticed we have a couple of minor issues in the code where we duplicate the closing logics, so I also cleaned them up in the following way:
a) When closing a task, we first trigger the corresponding closeClean/Dirty function; then we remove the task from Tasks bookkeeping, and for active task we also remove its task producer if EOS-V1 is used.
b) For closing dirty, we swallow the exception from close call and the remove task producer call; for closing clean, we store the thrown exception from either close call or the remove task producer, and then rethrow at the end of the caller. The difference though is that, for the exception from close call we need to retry close it dirty; for the exception from the remove task producer we do not need to re-close it dirty.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Override the default handler for stream threads if the stream's handler is used. We do no want the java default handler triggering when a thread is replaced.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
* Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task.
* Do not expose another getResumedTasks since I think its caller only need the getPausedTasks.
Reviewers: Bruno Cadonna <cadonna@apache.org>
* Add pause action to task-updater.
* When removing a task, also check in the paused tasks in addition to removed tasks.
* Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit changes the version check from != to > as the process method
works correctly on both version 1 and 2. != incorrectly throws on v1
records.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group.
Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: David Jacot <djacot@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
1. Added more unit test for RocksDBTimeOrderedSessionStore and RocksDBTimeOrderedSessionSegmentedBytesStore
2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
KIP-770 introduced a performance regression and needs some re-design.
Needed to resolve some conflict while reverting.
This reverts commits 1317f3f77a and 0924fd3f9f.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Guozhang Wang <guozhang@confluent.io>
When store changelog reader is called by a different thread than the stream thread, it can no longer use the main consumer to get committed offsets since consumer is not thread-safe. Instead, we would remove main consumer and leverage on the existing admin client to get committed offsets.
Reviewers: Bruno Cadonna <cadonna@apache.org>
The call to Task#completeRestoration calls methods on the main consumer.
The state updater thread should not access the main consumer since the
main consumer is not thread-safe. Additionally, Task#completeRestoration
changed the state of active tasks, but we decided to keep task life cycle
management outside of the state updater.
Task#completeRestoration should be called by the stream thread on
restored active tasks returned by the state udpater.
Reviewer: Guozhang Wang <guozhang@apache.org>
Currently the tests fail because there is a missing predicate in the retrievableException which causes the test to fail, i.e. the current predicates
containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
containsString("The state store, source-table, may have migrated to another instance"),
containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING")
wasn't complete. Another one needed to be added, namely "The specified partition 1 for store source-table does not exist.". This is because its possible for
assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
or
assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
(depending on which branch) to be thrown, i.e. see
org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The specified partition 1 for store source-table does not exist.
at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:223)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.retryUntil(StoreQueryIntegrationTest.java:579)
at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:186)
This happens when the stream hasn't been initialized yet. I have run the test around 12k times using Intellij's JUnit testing framework without any flaky failures. The PR also does some minor refactoring regarding moving the list of predicates into their own functions.
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
1. As titled, fix the right constructor param ordering.
2. Also added a few more loglines.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
For most tests we would need an auto-ticking mock timer to work with draining-with-timeout functions.
For tests that check for never checkpoint we need no auto-ticking timer to control exactly how much time elapsed.
Reviewers: Bruno Cadonna <cadonna@apache.org>
* Add a new API for session windows to range query session window by end time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR exposes the tasks managed by the state updater. The state updater manages all tasks that were added to the state updater and that have not yet been removed from it by draining one of the output queues.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.
Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Before this PR the call to `StreamsMetricsImpl.addAvgAndMinAndMaxToSensor()`
was just a call and not a verification on the mock. This miss happened
during the switch from EasyMock to Mockito.
Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Before this PR the calls to StreamsMetricsImpl.addInvocationRateAndCountToSensor()
were just calls and not a verification on the mock. This miss happened
during the switch from EasyMock to Mockito.
Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Before this PR the calls to the static methods on
StreamsMetricsImpl were just calls and not a verification
on the mock. This miss happened during the switch from
EasyMock to Mockito.
Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
* KAFKA-13930: Add 3.2.0 Streams upgrade system tests
Apache Kafka 3.2.0 was recently released. Now we need
to test upgrades from 3.2 to trunk in our system tests.
Reviewer: Bill Bejeck <bbejeck@apache.org>
Changes the tag name from topic-name to just topic to conform to the way this tag is named elsewhere (ie in the clients)
Also:
- fixes a comment about dynamic topic routing
- fixes some indentation in MockRecordCollector
- Undoes the changes to KStreamSplitTest.scala and TestTopicsTest which are no longer necessary after this hotfix
Reviewers: Bruno Cadonna <cadonna@apache.org>
There are some considerata embedded in this seemingly straight-forward PR that I'd like to explain here. The StreamPartitioner is used to send records to three types of topics:
1) repartition topics, where key should never be null.
2) changelog topics, where key should never be null.
3) sink topics, where only non-windowed key could be null and windowed key should still never be null.
Also, the StreamPartitioner is used as part of the IQ to determine which host contains a certain key, as determined by the case 2) above.
This PR's main goal is to remove the deprecated producer's default partitioner, while with those things in mind such that:
We want to make sure for not-null keys, the default murmur2 hash behavior of the streams' partitioner stays consistent with producer's new built-in partitioner.
For null-keys (which is only possible for non-window default stream partition, and is never used for IQ), we would fix the issue that we may never rotate to a new partitioner by setting the partition as null hence relying on the newly introduced built-in partitioner.
Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, Matthias J. Sax <matthias@confluent.io>
There are a bunch of tests which do not clean up after themselves. This leads to
accumulation of files in the tmp directory of the system on which the tests are
running.
This code change fixes some of the main culprit tests which leak the files in the
temporary directory.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Kvicii <kvicii.yu@gmail.com>
InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled.
Reviewers: Divij Vaidya <diviv@amazon.com>, Kvicii <42023367+Kvicii@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834).
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewers: Bonnie Varghese <bvarghese@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Guozhang Wang <guozhang@apache.org>, Bruno Cadonna <cadonna@apache.org>
Use the newly added function to replace the old addMetric function that may throw illegal argument exceptions.
Although in some cases concurrency should not be possible they do not necessarily remain always true in the future, so it's better to use the new API just to be less error-prone.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Minor followup to #12235 that adds a null check on the record key in the new ClientUtils#producerRecordSizeInBytes utility method, as there are valid cases in which we might be sending records with null keys to the Producer, such as a simple builder.stream("non-keyed-input-topic").filter(...).to("output-topic")
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
This PR adds removing of active and standby tasks from the default implementation of the state updater. The PR also includes refactoring that clean up the code.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Now that we've turned off logging in the brokers/zookeeper/config classes we can finally see at least some of the logs where Streams is actually doing something when trying to debug tests from a failed PR build. But I've noticed we still have some flooding of warnings from the NetworkClient and info-level junk from Metadata, so to maximize the visible useful logs we should filter out everything bu the producer/consumer client themselves (in addition to Streams) fine-grained logging
Reviewers: Luke Chen <showuon@gmail.com>, Kvicii Y
Implementation of KIP-846: Source/sink node metrics for Consumed/Produced throughput in Streams
Adds the following INFO topic-level metrics for the total bytes/records consumed and produced:
bytes-consumed-total
records-consumed-total
bytes-produced-total
records-produced-total
Reviewers: Kvicii <Karonazaba@gmail.com>, Guozhang Wang <guozhang@apache.org>, Bruno Cadonna <cadonna@apache.org>
This is for KIP-812:
* added leaveGroup on a new close function in kafka stream
* added logic to resolve future returned by remove member call in close method
* added max check on remainingTime value in close function
Reviewers: David Jacot <david.jacot@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This test has been flaky due to unexpected rebalances during the test.
This change fixes it by detecting an unexpected rebalance and retrying
the test logic (within a timeout).
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
This is a copy PR of #12037: Implementation to emit final for sliding window agg. This is authored by lihaosky.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The design is described in detail in KIP-794
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner.
Implementation notes:
The default partitioning logic is moved to the BuiltInPartitioner class
(there is one object per topic). The object keeps track of how many
bytes are produced per-partition and once the amount exceeds batch.size,
switches to the next partition (note that partition switch decision is
decoupled from batching). The object also keeps track of probability
weights that are based on the queue sizes (the larger the queue size
is the less chance for the next partition to be chosen). The queue
sizes are calculated in the RecordAccumulator in the `ready` method,
the method already enumerates all partitions so we just add some extra
logic into the existing O(N) method. The partition switch decision may
take O(logN), where N is the number partitions per topic, but it happens
only once per batch.size (and the logic is avoided when all queues are
of equal size). Produce bytes accounting logic is lock-free.
When partitioner.availability.timeout.ms is non-0, RecordAccumulator
keeps stats on "node latency" which is defined as the difference between
the last time the node had a batch waiting to be send and the last time
the node was ready to take a new batch. If this difference exceeds
partitioner.availability.timeout.ms we don't switch to that partition
until the node is ready.
Reviewers: Jun Rao <junrao@gmail.com>
A prior commit accidentally changed the javadoc for RecordContext.
In reality, it is not reachable from api.Processor, only Processor.
Reviewers: Guozhang Wang <guozhang@apache.org>
Time ordered session store implementation. I introduced AbstractRocksDBTimeOrderedSegmentedBytesStore to make it generic for RocksDBTimeOrderedSessionSegmentedBytesStore and RocksDBTimeOrderedSegmentedBytesStore.
A few minor follow-up changes:
1. Avoid extra byte array allocation for fixed upper/lower range serialization.
2. Rename some class names to be more consistent.
Authored-by: Hao Li <1127478+lihaosky@users.noreply.github.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com.com>, John Roesler <vvcephei@apache.org>
This PR adds the default implementation of the state updater. The implementation only implements adding active tasks to the state updater.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This is a copy PR of #11896, authored by @lihaosky (Hao Li): Initial implementation to emit final for TimeWindowedKStreamImpl. This PR is on top of #12030
Author: Hao Li
Reviewers: John Roesler <vvcephei@apache.org>
A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Updates the KStream process API to cover the use cases
of both process and transform, and deprecate the KStream transform API.
Implements KIP-820
Reviewer: John Roesler <vvcephei@apache.org>
Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.
There are cases, where this behavior is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.
Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper payloads.
Default FK response-sink partitioner extracts the correct partition
from the value and routes the message accordingly.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Fixes a small copy/paste error from #10390 that changed the parameter names
for fetchSession from the singular session form (eg `startTime`) to the range
form (eg `earliestSessionStartTime`).
Reviewers: John Roesler <vvcephei@apache.org>
Fix upperbound for sliding window, making it compatible with no grace period (kafka-13739)
Added unit test for early sliding window and "normal" sliding window for both events within one time difference (small input) and above window time difference (large input).
Fixing this window interval may slightly change stream behavior but probability to happen is extremely slow and may not have a huge impact on the result given.
Reviewers Leah Thomas <lthomas@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than 0 has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies.
It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this.
Also includes some minor refactoring plus a fix for an off-by-one error in #11796
Reviewers: Luke Chen <showuon@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>
I noticed two issues in the log4j entry:
1. It's formatted as "{}...{}" + param1, param2; effectively it is one param only, and the printed line is effectively mis-aligned: we always print Subtopology [sourceTopics set] was missing source topics {}
2. Even fix 1) is not enough, since topologyName may be null. On the other hand I think the original goal is not to print the topology name but the sub-topology id since it's within the per-sub-topology loop.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
#11356 inadvertently changed
the (undefined) header forwarding behavior of stream-stream joins.
This change does not define the behavior, but just restores the prior
undefined behavior for continuity's sake. Defining the header-forwarding
behavior is future work.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
The task assignor is modified to consider the Streams client with the most caught up states if no Streams client exists that is caught up, i.e., the lag of the states on that client is less than the acceptable recovery lag.
Unit test for case task assignment where no caught up nodes exist.
Existing unit and integration tests to verify no other behaviour has been changed
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
In KIP-811, we added a new config repartition.purge.interval.ms to set repartition purge interval. In this flaky test, we expected the purge interval is the same as commit interval, which is not correct anymore (default is 30 sec). Set the purge interval explicitly to fix this issue.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Guozhang Wang <wangguoz@gmail.com>
This patch includes metadata wait time in total blocked time. First, this patch adds a new metric for total producer time spent waiting on metadata, called metadata-wait-time-ms-total. Then, this time is included in the total blocked time computed from StreamsProducer.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The caching store layers were passing down writes into lower store layers upon eviction, but not setting the context to the evicted records' context. Instead, the context was from whatever unrelated record was being processed at the time.
Reviewers: Matthias J. Sax <mjsax@apache.org>
Reviewers: Hao Li <hli@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This PR is part of KIP-708 and adds rack aware standby task assignment logic.
Rack aware standby task assignment won't be functional until all parts of this KIP gets merged.
Splitting PRs into three smaller PRs to make the review process easier to follow. Overall plan is the following:
⏭️ Rack aware standby task assignment logic #10851⏭️ Protocol change, add clientTags to SubscriptionInfoData #10802👉 Add required configurations to StreamsConfig (public API change, at this point we should have full functionality)
This PR implements last point of the above mentioned plan.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This test was falling occasionally. It does appear to be a matter of the tests assuming perfecting deduplication/caching when asserting the test output records, ie a bug in the test not in the real code. Since we are not assuming that it is going to be perfect I changed the test to make sure the records we expect arrive, instead of only those arrive.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
I found a couple of flakiness with the integration test.
IQv1 on stores failed although getting the store itself is covered with timeouts, since the InvalidStoreException is upon the query (store.all()). I changed to the util function with IQv2 whose timeout/retry covers the whole procedure. Example of such failure is: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11802/11/tests/
With ALOS we should not check that the output, as well as the state store content is exactly as of processed once, since it is possible that during processing we got spurious task-migrate exceptions and re-processed with duplicates. I actually cannot reproduce this error locally, but from the jenkins errors it seems possible indeed. Example of such failure is: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11433/4/tests/
Some minor cleanups.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
We added `currentSystemTimeMs()` and `currentStreamTimeMs()` to the
`ProcessorContext` via KIP-622, but forgot to add both to the new
`api.ProcessorContext`.
Reviewers: Ricardo Brasil <anribrasil@gmail.com>, Guozhang Wang <guozhang@confluent.io>
Initial State store implementation for TimedWindow and SlidingWindow.
RocksDBTimeOrderedWindowStore.java contains one RocksDBTimeOrderedSegmentedBytesStore which contains index and base schema.
PrefixedWindowKeySchemas.java implements keyschema for time ordered base store and key ordered index store.
Reviewers: James Hughes, Guozhang Wang <wangguoz@gmail.com>
In this test, we started Kafka Streams app and then write to input topic in transaction. It's possible when streams commit offset, transaction hasn't finished yet. So the streams committed offset could be less than the eventual endOffset.
This PR moves the logic of writing to input topic before starting streams app.
Reviewers: John Roesler <vvcephei@apache.org>
This reverts commit 2ccc834faa.
This reverts commit 2ccc834. We were seeing serious regressions in our state heavy benchmarks. We saw that our state heavy benchmarks were experiencing a really bad regression. The State heavy benchmarks runs with rolling bounces with 10 nodes.
We regularly saw this exception: java.lang.OutOfMemoryError: Java heap space
I ran through a git bisect and found this commit. We verified that the commit right before did not have the same issues as this one did. I then reverted the problematic commit and ran the benchmarks again on this commit and did not see any more issues. We are still looking into the root cause, but for now since this isn't a critical improvement so we can remove it temporarily.
Reviewers: Bruno Cadonna <cadonna@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, David Jacot <djacot@confluent.io>, Ismael Juma <ismael@confluent.io>
This test has started to become flaky at a relatively low, but consistently reproducible, rate. Upon inspection, we find this is due to IOExceptions during the #cleanUpNamedTopology call -- specifically, most often a DirectoryNotEmptyException with an ocasional FileNotFoundException
Basically, signs pointed to having returned from/completed the #removeNamedTopology future prematurely, and moving on to try and clear out the topology's state directory while there was a streamthread somewhere that was continuing to process/close its tasks.
I believe this is due to updating the thread's topology version before we perform the actual topology update, in this case specifically the act of eg clearing out a directory. If one thread updates its version and then goes to perform the topology removal/cleanup when the second thread finishes its own topology removal, this other thread will check whether all threads are on the latest version and complete any waiting futures if so -- which means it can complete the future before the first thread has actually completed the corresponding action
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Quick fix to make sure we log the actual source of the failure both in the actual log message as well as the StreamsException that we bubble up to the user's exception handler, and also to report the offending topology by filling in the StreamsException's taskId field.
Also prevents a NoSuchElementException from being thrown when trying to compute the minimum topology version across all threads when the last thread is being unregistered during shutdown.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
We used to call TopologyMetadata#maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion when a thread was being unregistered/shutting down, to check if any of the futures listening for topology updates had been waiting on this thread and could be completed. Prior to invoking this we make sure to remove the current thread from the TopologyMetadata's threadVersions map, but this thread is actually then re-added in the #maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion call.
To fix this, we should break up this method into separate calls for each of its two distinct functions, updating the version and checking for topology update completion. When unregistering a thread, we should only invoke the latter method
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
While debugging the flaky NamedTopologyIntegrationTest. shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing test, I did discover one real bug. The problem was that we update the TopologyMetadata's builders map (with the known topologies) inside the #removeNamedTopology call directly, whereas the StreamThread may not yet have reached the poll() in the loop and in case of an offset reset, we get an NP.e
I changed the NPE to just log a warning for now, going forward I think we should try to tackle some tech debt by keeping the processing tasks and the TopologyMetadata in sync
Also includes a quick fix on the side where we were re-adding the topology waiter/KafkaFuture for a thread being shut down
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
This test has been failing somewhat regularly due to going into the ERROR state before reaching RUNNING during the startup phase. The problem is that we are reusing the DELAYED_INPUT_STREAM topics, which had previously been assumed to be uniquely owned by a particular test. We should make sure to delete and re-create these topics for any test that uses them.
Currently the #add/removeNamedTopology APIs behave a little wonky when the application is still in CREATED. Since adding and removing topologies runs some validation steps there is valid reason to want to add or remove a topology on a dummy app that you don't plan to start, or a real app that you haven't started yet. But to actually check the results of the validation you need to call get() on the future, so we need to make sure that get() won't block forever in the case of no failure -- as is currently the case
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
This PR is part of KIP-708 and adds rack aware standby task assignment logic.
Reviewer: Bruno Cadonna <cadonna@apache.org>, Luke Chen <showuon@gmail.com>, Vladimir Sitnikov <vladimirsitnikov.apache.org>
Pretty much any time we have an integration test failure that's flaky or only exposed when running on Jenkins through the PR builds, it's impossible to debug if it cannot be reproduced locally as the logs attached to the test results have truncated the entire useful part of the logs. This is due to the logs being flooded at the beginning of the test when the Kafka cluster is coming up, eating up all of the allotted characters before we even get to the actual Streams test. Setting log4j.logger.kafka to ERROR greatly improves the situation and cuts down on most of the excessive logging in my local runs. To improve things even more and have some hope of getting the part of the logs we actually need, I also set the loggers for all of the Config objects to ERROR, as these print out the value of every single config (of which there are a lot) and are not useful as we can easily figure out what the configs were if necessary by just inspecting the test locally.
Reviewers: Luke Chen <showuon@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Seen a few of the new tests added fail on PR builds lately with
"java.lang.AssertionError: Expected all streams instances in [org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper@7fb3e6b0] to be RUNNING within 30000 ms"
We already had some tests using the 30s timeout while others were bumped all the way up to 60s, I figured we should try out a default timeout of 45s and if we still see failures in specific tests we can go from there
This PR addresses the remaining nits from the final review of #11787
It also deletes two integration test classes which had only one test in them, and moves the tests to another test class file to save on the time to bring up an entire embedded kafka cluster just for a single run
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
We should be able to change the topologies while still in the CREATED state. We already allow adding them, but this should include removing them as well
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Quick followup to #11787 to optimize the impact of the task backoff by reducing the time to replace a thread. When a thread is going through a dirty close, ie shutting down from an uncaught exception, we should be sending a LeaveGroup request to make sure the broker acknowledges the thread has died and won't wait up to the `session.timeout` for it to join the group if the user opts to `REPLACE_THREAD` in the handler
Reviewers: Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
Part 1 in the initial series of error handling for named topologies.
*Part 1: Track tasks with errors within a named topology & implement constant-time based task backoff
Part 2: Implement exponential task backoff to account for recurring errors
Part 3: Pause/backoff all tasks within a named topology in case of a long backoff/frequent errors for any individual task
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
When we hit an exception when processing tasks we should save the work we have done so far.
This will only be relevant with ALOS and EOS-v1, not EOS-v2. It will actually reduce the number of duplicated record in ALOS because we will not be successfully processing tasks successfully more than once in many cases.
This is currently enabled only for named topologies.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
Tests are swallowing exceptions for supported operating systems, which could hide regressions.
Co-authored-by: Rob Leland <rleland@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
Basic refactoring with no logical changes to lay the groundwork & facilitate reviews for error handling work.
This PR just moves all methods that go beyond the management of tasks into a new TaskExecutor class, such as processing, committing, and punctuating. This breaks up the ever-growing TaskManager class so it can focus on the tracking and updating of the tasks themselves, while the TaskExecutor can focus on the actual processing. In addition to cleaning up this code this should make it easier to test this part of the code.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
For debugging it is useful to see the actual state directory when
an exception regarding the state directory is thrown.
Reviewer: Bill Bejeck <bbejeck@apache.org>
Previously we were only verifying the new query could be added after we had already inserted it into the TopologyMetadata, so we need to move the validation upfront.
Also adds a test case for this and improves handling of NPE in case of future or undiscovered bugs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
RocksDB v6.27.3 has been released and it is the first release to support s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle without s390x support.
RocksDB v6.27.3 has added some new options that require an update to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java but no other changes are needed to upgrade.
I have run the unit/integration tests locally on s390x and also the :streams tests on x86_64 and they pass.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Kurt Ostfeld <kurt@samba.tv>
Reviewers: Kvicii <Karonazaba@gmail.com>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This PR is an implementation of: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390. The following changes have been made:
* Adding a new config input.buffer.max.bytes applicable at a topology level.
* Adding new config statestore.cache.max.bytes.
* Adding new metric called input-buffer-bytes-total.
* The per partition config buffered.records.per.partition is deprecated.
* The config cache.max.bytes.buffering is deprecated.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
There are cases in which a state store neither has an in-memory position built up nor has it gone through the state restoration process. If a store is persistent (i.e., RocksDB), and we stop and restart Streams, we will have neither of those continuity mechanisms available.
This patch:
* adds a test to verify that all stores correctly recover their position after a restart
* implements storage and recovery of the position for persistent stores alongside on-disk state
Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
Currently, IQv2 forwards all queries to the underlying store. We add this bypass to allow handling of key queries in the cache. If a key exists in the cache, it will get answered from there.
As part of this PR, we realized we need access to the position of the underlying stores. So, I added the method getPosition to the public API and ensured all state stores implement it. Only the "leaf" stores (Rocks*, InMemory*) have an actual position, all wrapping stores access their wrapped store's position.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, John Roesler <vvcephei@apache.org>
There is a brief window between when the store is registered and when
it is initialized when it might handle a query, but there is no context.
We treat this condition just like a store that hasn't caught up to the
desired position yet.
Reviewers: Guozhang Wang <guozhang@apache.org>, Matthias J. Sax <mjsax@apache.org>, A. Sophie Blee-Goldman <ableegoldman@apache.org>, Patrick Stuedi <pstuedi@apache.org>
Followup to #11600 to invoke the streams exception handler on the MissingSourceTopicException, without killing/replacing the thread
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Fixes some issues with the NamedTopology version of the IQ methods that accept a topologyName argument, and adds tests for all.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Avoid throwing a MissingSourceTopicException inside the #assign method when named topologies are used, and just remove those topologies which are missing any of their input topics from the assignment.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds PAPI construction for Window and Session stores to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewer: Guozhang Wang <guozhang@apache.org>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds KeyValueStore PAPI construction to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Guozhang Wang <guozhang@apache.org>