When Kafka Streams skips overs corrupted messages, it might not resume previously paused partitions,
if more than one record is skipped at once, and if the buffer drop below the max-buffer limit at the same time.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This is a backport of #17686 merged to trunk and cherry-picked to 3.9. Need to do a standalone PR due to merge conflicts.
Reviewers: Matthias Sax <mjsax@apache.org>
TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.
Reviewers: Bill Bejeck <bill@confluent.io>
Each KafkaStreams instance maintains a map from threadId to state
to use to aggregate to a KafkaStreams app state. The map is updated
on every state change, and when a new thread is created. State change
updates are done in a synchronized blocks, however the update that
happens on thread creation is not, which can raise
ConcurrentModificationException. This patch moves this update
into the listener object and protects it using the object's lock.
It also moves ownership of the state map into the listener so that
its less likely that future changes access it without locking
Reviewers: Matthias J. Sax <matthias@confluent.io>
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link
This is because the constant in @ParameterizedTest
String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method
For existing tests which override the name, should work as is. The precedence rules are explained
name attribute in @ParameterizedTest, if present
value of the junit.jupiter.params.displayname.default configuration parameter, if present
DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest
Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names
Sample test run output
Before: [1] true link
After: testMetadataExpiry(boolean).false link
This commit is an extension of bdf6d46b41 which needed to reverted due to introduces test failures.
Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Kafka Streams should not crash if a task is closed dirty. This is a
hotfix to catch/swallow an IllegalStateException from
`producer.abortTrandsaction()` on the close-dirty clean-up path.
A proper fix would be to not call `abortTransaction()` for this
particular case.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.
This PR reverts this change for now.
Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated
There is a race condition in the state updater that can cause the following:
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized
This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.
To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.
We should not remove the task twice, otherwise, we'll end up in this situation
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException
Reviewers: Bruno Cadonna <cadonna@apache.org>
We allocate an `Options` in order to list column families while opening
the `RocksDBStore`, but never explicitly `close()` it.
`Options` is a RocksDB native object, which needs to be explicitly
closed to free the resources it allocates in native memory.
Failing to close this causes a memory leak when repeatedly
opening/closing stores.
It's an `AutoCloseable`, and all usage of it is confined to the
surrounding `try` block, so we can just hook it out to the `try` to
auto-close it when done.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Increase log level to INFO similar to other log statement in this class, to surface important information on the non-critical code path.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.
This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
We did no complete KIP-714 with regard to collecting producer clients
instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing
an UnsupportedOperationException, we should return an empty map.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Before #14648, the KStreamImplJoin class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended.
To fix this, we allow creating non-timestamped stores using the DslWindowParams
Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. I also confirmed that the only place in the code persistentWindowStore was used before #14648 was in the StreamJoined code.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.
The fix is to clear the offsets in the record collector on close.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Part of KIP-714.
Add support to collect client instance id of the global consumer.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This reverts commit bdf6d46b41. We found out that this commit introduced flakiness in Streams' tests. We will revise it.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Part of KIP-714.
Adds support to expose main consumer client instance id.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Minor cleanup to make it easier to follow the restore listener logic. Currently, the KafkaStreams class tracks two restore listener fields: there is a non-final, nullable "globalRestoreListener" that holds the restore listener specified by the user (if any), and then there is a final "delegatingRestoreListener" that's used to encapsulate the null checks for the user-specified restore listener. It's a bit confusing to follow along with what each of these restore listener fields is doing and the relationship between them when they're on equal footing like this, when in reality they're more hierarchical and the DelegatingRestoreListener is actually a wrapper over the user-specified globalRestoreListener. The term "global" is also a bit misleading as it can get mixed up with global state stores, when it's really meant to be "global" in the sense that it applies to all state stores in the application.
It would be nice to just move the user listener completely inside the DelegatingRestoreListener class and then make that class static, as well as renaming the field to "userRestoreListener"
Reviewers: Matthias J. Sax <mjsax@apache.org>
Implementation for KIP-988, adds the new StandbyUpdateListener interface
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Colt McNealy <colt@littlehorse.io>
Relax non-null FK left-join requirement.
Testing Strategy: Inject extractor which returns null on first or second element.
Reviewers: Walker Carlson <wcarlson@apace.org>
- Part of KIP-714
- Add new configs and public API for Kafka Streams
- Implement support to get admin client instance id
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Apoorv Mittal <amittal@confluent.io>, Walker Carlson <wcarlson@confluent.io>
RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted does not wait to ensure that test-topic-A is deleted. The second assignment condition times out in 15sec.
We should wait for the topic to be deleted (default timeout = 30sec) and then check the assignment.
Reviewers: Walker Carlson <wcarlson@apache.org>
When opening RocksDB, we were checking for an error in
`RocksDBTimestampedStore` to detect if the `keyValueWithTimestamp` CF is
missing.
The `openRocksDB` method now supports any number of column families, not
just the extra one used by `RocksDBTimestampedStore`. We now check for
the existing column families _before_ opening the database, which allows
us to create any missing column families.
Supporting automatic creation of any number of missing column families
is a pre-requisite for KIP-892: Transactional StateStores.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>