Commit Graph

2811 Commits

Author SHA1 Message Date
Matthias J. Sax fe309afa9f MINOR: Update 3.7 branch version to 3.7.2-SNAPSHOT 2024-12-12 15:43:59 -08:00
Matthias J. Sax 79a8f2b5f4 Bump version to 3.7.2 2024-12-04 10:50:28 -08:00
Matthias J. Sax 96d64af3d8 KAFKA-17299: add unit tests for previous fix (#17919)
https://github.com/apache/kafka/pull/17899 fixed the issue, but did not
add any unit tests.

Reviewers: Bill Bejeck <bill@confluent.io>
2024-11-25 12:57:50 -08:00
Laxman Ch f416c01e37 KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899)
When Kafka Streams skips overs corrupted messages, it might not resume previously paused partitions,
if more than one record is skipped at once, and if the buffer drop below the max-buffer limit at the same time.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-11-25 12:47:25 -08:00
Bill Bejeck 1d975923d8 Update javadoc on split to mention first matching (#17799)
Clarify the functionality of split matching on first predicate
Reviewers: Matthias Sax <mjsax@apache.org>
2024-11-13 11:58:06 -05:00
Bill Bejeck 9b29f289a8 Backport fix from 3.9 (#17716)
This is a backport of #17686 merged to trunk and cherry-picked to 3.9. Need to do a standalone PR due to merge conflicts.
Reviewers: Matthias Sax <mjsax@apache.org>
2024-11-12 16:08:38 -05:00
Matthias J. Sax 35c17ac0ae KAFKA-17872: Update consumed offsets on records with invalid timestamp (#17710)
TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>
2024-11-09 23:35:09 -08:00
Rohan fb2d647116 KAFKA-16955: fix synchronization of streams threadState (#16337)
Each KafkaStreams instance maintains a map from threadId to state
to use to aggregate to a KafkaStreams app state. The map is updated
on every state change, and when a new thread is created. State change
updates are done in a synchronized blocks, however the update that
happens on thread creation is not, which can raise
ConcurrentModificationException. This patch moves this update
into the listener object and protects it using the object's lock.
It also moves ownership of the state map into the listener so that
its less likely that future changes access it without locking

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-10-25 14:15:36 -07:00
Matthias J. Sax 89be50f582 MINOR: fix HTML for topology.optimization config (#16953)
The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x0000000800c0cf18@b1bc7ed

cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization

Verified the fix via running StreamsConfig#main() locally.

Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-22 17:49:58 -07:00
Igor Soarez 67151c6022
MINOR: Update 3.7 branch version to 3.7.2-SNAPSHOT 2024-06-28 10:51:17 +02:00
Igor Soarez e2494e6ffb
Bump version to 3.7.1 2024-06-18 22:27:22 +01:00
Ayoub Omari 64845b9b07 KAFKA-15625: Do not flush global state store at each commit (#15361)
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2024-04-24 14:45:21 +02:00
Bruno Cadonna 6e998cffdd MINOR: Add junit properties to display parameterized test names (#14983)
In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link
This is because the constant in @ParameterizedTest

String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";

This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method

For existing tests which override the name, should work as is. The precedence rules are explained

    name attribute in @ParameterizedTest, if present
    value of the junit.jupiter.params.displayname.default configuration parameter, if present
    DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest

Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names

Sample test run output
Before: [1] true link
After: testMetadataExpiry(boolean).false link

This commit is an extension of bdf6d46b41 which needed to reverted due to introduces test failures.

Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2024-04-23 16:54:52 +08:00
Matthias J. Sax 4e3b0c3dd7 HOTFIX: fix html markup 2024-03-08 14:29:46 -08:00
Matthias J. Sax 4cd36f938f TRIVIAL: fix typo 2024-03-08 14:29:28 -08:00
Stanislav Kozlovski bac0297cde
MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT (#15431)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-03-08 23:38:34 +08:00
sanepal 0760ca5e94 KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)
KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
2024-03-07 12:05:12 -08:00
Matthias J. Sax 29d0194c09
KAFKA-16221: hotfix for EOS error handling (#15315)
Kafka Streams should not crash if a task is closed dirty. This is a
hotfix to catch/swallow an IllegalStateException from
`producer.abortTrandsaction()` on the close-dirty clean-up path.

A proper fix would be to not call `abortTransaction()` for this
particular case.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-02-05 11:11:10 +01:00
Matthias J. Sax a242229d38 KAFKA-15594: Add version 3.6 to Kafka Streams system tests (#15151)
Reviewers: Walker Carlson <wcarlson@confluent.io>
2024-01-26 15:00:20 -08:00
Matthias J. Sax 8b9715bb70 KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.

This PR reverts this change for now.

Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
2024-01-19 09:24:28 -08:00
Matthias J. Sax 1c29c84fa6 KAFKA-16139: Fix StreamsUpgradeTest (#15207)
Adds version 3.6 to the possible values for config upgrade_from.

Reviewers: Bruno Cadonna <bruno@confluent.io>
2024-01-17 13:29:02 -08:00
Bruno Cadonna 091d5570a8 KAFKA-16139: Fix StreamsUpgradeTest (#15199)
Adds version 3.5 to the possible values for config upgrade_from.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2024-01-16 17:27:11 -08:00
Lucas Brutschy 564a2e12af
KAFKA-16089: Revert "KAFKA-14412: Better Rocks column family management" (#15145)
* Revert "KAFKA-16086: Fix memory leak in RocksDBStore (#15135)"

This reverts commit 58d6d2e592.

* Revert "KAFKA-14412: Better Rocks column family management (#14852)"

This reverts commit 20a223061c.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2024-01-09 10:10:01 +01:00
Lucas Brutschy 01ecb1ab48
KAFKA-16097: Disable state updater in 3.7 (#15146)
Several problems are still appearing while running 3.7 with
the state updater. This change will disable the state updater
by default.
2024-01-09 09:24:33 +01:00
Lucas Brutschy a9bc36e388 KAFKA-16077: Streams with state updater fails to close task upon fencing (#15117)
* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated

There is a race condition in the state updater that can cause the following:

 1. We have an active task in the state updater
 2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
 3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
 4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
 5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized

This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.

To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.

We should not remove the task twice, otherwise, we'll end up in this situation

1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException

Reviewers: Bruno Cadonna <cadonna@apache.org>
2024-01-05 19:33:40 +01:00
Nick Telford 58d6d2e592 KAFKA-16086: Fix memory leak in RocksDBStore (#15135)
We allocate an `Options` in order to list column families while opening
the `RocksDBStore`, but never explicitly `close()` it.

`Options` is a RocksDB native object, which needs to be explicitly
closed to free the resources it allocates in native memory.

Failing to close this causes a memory leak when repeatedly
opening/closing stores.

It's an `AutoCloseable`, and all usage of it is confined to the
surrounding `try` block, so we can just hook it out to the `try` to
auto-close it when done.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-05 19:33:27 +01:00
Matthias J. Sax e090229bae MINOR: improve logging for state management (#15045)
Increase log level to INFO similar to other log statement in this class, to surface important information on the non-critical code path.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-04 11:23:07 +01:00
Almog Gavra fe3bc9c709 KAFKA-16046: also fix stores for outer join (#15073)
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
2024-01-02 15:08:25 -08:00
Lucas Brutschy 6751570e2b KAFKA-9545: Fix IllegalStateException in updateLags (#15096)
We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.

This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
2024-01-02 17:30:20 +01:00
Matthias J. Sax 31ab73cf1e
MINOR: avoid unnecessary UnsupportedOperationException (#15102)
We did no complete KIP-714 with regard to collecting producer clients
instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing
an UnsupportedOperationException, we should return an empty map.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2024-01-02 07:34:42 -08:00
Almog Gavra 2488fb71b9 KAFKA-16046: fix stream-stream-join store types (#15061)
Before #14648, the KStreamImplJoin class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended.

To fix this, we allow creating non-timestamped stores using the DslWindowParams

Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. I also confirmed that the only place in the code persistentWindowStore was used before #14648 was in the StreamJoined code.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-22 15:33:45 +01:00
Bruno Cadonna f689e60c3a KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15044)
Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-21 10:16:03 +01:00
Hao Li 85cee984ac
MINOR: Fix rack-aware assignment tests (#14965)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-11 01:38:57 -08:00
Matthias J. Sax f52575b172
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14948)
Part of KIP-714.

Adds support to expose producer client instance id.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2023-12-11 00:20:01 -08:00
Matthias J. Sax fb5d45d26e
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14935)
Part of KIP-714.

Add support to collect client instance id of the global consumer.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-08 09:42:32 -08:00
David Jacot b96ded9859
Revert "MINOR: Add junit properties to display parameterized test names (#14687)" (#14961)
This reverts commit bdf6d46b41. We found out that this commit introduced flakiness in Streams' tests. We will revise it.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2023-12-07 23:20:03 -08:00
Hanyu Zheng 5ba7bfaa57
KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. (#14907)
Update to KIP-992.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-07 15:29:29 -08:00
Matthias J. Sax 7dabd27f8d
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14922)
Part of KIP-714.

Adds support to expose main consumer client instance id.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-07 10:39:39 -08:00
Hanyu Zheng 9d2297ad2d
KAFKA-15527: Support ResultOrder to reverseRange and reverseAll query over kv-store in IQv2 (#14906)
Update to KIP-985.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-07 08:32:16 -08:00
Alieh Saeedi 6694ea424a
KAFKA-15347: fix unit test (#14947)
Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-06 19:53:43 -08:00
A. Sophie Blee-Goldman 0ca6e3e359
MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener (#14886)
Minor cleanup to make it easier to follow the restore listener logic. Currently, the KafkaStreams class tracks two restore listener fields: there is a non-final, nullable "globalRestoreListener" that holds the restore listener specified by the user (if any), and then there is a final "delegatingRestoreListener" that's used to encapsulate the null checks for the user-specified restore listener. It's a bit confusing to follow along with what each of these restore listener fields is doing and the relationship between them when they're on equal footing like this, when in reality they're more hierarchical and the DelegatingRestoreListener is actually a wrapper over the user-specified globalRestoreListener. The term "global" is also a bit misleading as it can get mixed up with global state stores, when it's really meant to be "global" in the sense that it applies to all state stores in the application.

It would be nice to just move the user listener completely inside the DelegatingRestoreListener class and then make that class static, as well as renaming the field to "userRestoreListener"

Reviewers: Matthias J. Sax <mjsax@apache.org>
2023-12-06 10:38:54 -08:00
Alok Thatikunta bdf6d46b41
MINOR: Add junit properties to display parameterized test names (#14687)
In many parameterized tests, the display name is broken. Example - `testMetadataFetch` appears as `[1] true`, `[2] false`  [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/) 
This is because the constant in `@ParameterizedTest`
```java
String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
```

This PR adds a new `junit-platform.properties` which overrides to add a `{displayName}` which shows the `the display name of the method`

For existing tests which override the name, should work as is. The precedence rules are explained

> 1. `name` attribute in `@ParameterizedTest`, if present
> 2. value of the `junit.jupiter.params.displayname.default` configuration parameter, if present
> 3. `DEFAULT_DISPLAY_NAME` constant defined in `@ParameterizedTest`

Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names

Sample test run output 
Before: `[1] true` [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/)
After: `testMetadataExpiry(boolean).false` [link](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14687/1/testReport/junit/org.apache.kafka.clients.producer/KafkaProducerTest/)

Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>, David Jacot <djacot@confluent.io>
2023-12-06 08:42:45 -08:00
Hao Li 6be2e5c131
KAFKA-15022: tests for HA assignor and StickyTaskAssignor (#14921)
Part of KIP-925.

Tests for HAAssignor and StickyAssignor.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-06 08:12:47 -08:00
Alieh Saeedi 9658942366
KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) (#14626)
Implements KIP-968.

Add new query type MultiVersionedKeyQuery for IQv2 supported by versioned state stores.
2023-12-06 07:56:12 -08:00
Eduwer Camacaro 83110e2d42
KAFKA-15448: Streams Standby Update Listener (KIP-988) (#14735)
Implementation for KIP-988, adds the new StandbyUpdateListener interface

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Colt McNealy <colt@littlehorse.io>
2023-12-06 01:27:38 -08:00
Hao Li f6560ab1cd
KAFKA-15022: introduce interface to control graph constructor (#14714)
Part of KIP-925.

Refactor graph construction and assignment in RackAwareAssignor to new interface.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2023-12-05 22:00:04 -08:00
Florin Akermann 4a958c6cb1
Kafka-14748: Relax non-null FK left-join requirement (#14107)
Relax non-null FK left-join requirement.

Testing Strategy: Inject extractor which returns null on first or second element.

Reviewers: Walker Carlson <wcarlson@apace.org>
2023-12-05 18:04:32 -06:00
Matthias J. Sax 45f5d0f621
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14908)
- Part of KIP-714
- Add new configs and public API for Kafka Streams
- Implement support to get admin client instance id

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Apoorv Mittal <amittal@confluent.io>, Walker Carlson <wcarlson@confluent.io>
2023-12-05 12:19:56 -08:00
ashwinpankaj f2aeff0026
KAFKA-9545: Fix Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` (#14910)
RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted does not wait to ensure that test-topic-A is deleted. The second assignment condition times out in 15sec.

We should wait for the topic to be deleted (default timeout = 30sec) and then check the assignment.

Reviewers: Walker Carlson <wcarlson@apache.org>
2023-12-05 11:16:06 -06:00
Nick Telford 20a223061c
KAFKA-14412: Better Rocks column family management (#14852)
When opening RocksDB, we were checking for an error in
`RocksDBTimestampedStore` to detect if the `keyValueWithTimestamp` CF is
missing.

The `openRocksDB` method now supports any number of column families, not
just the extra one used by `RocksDBTimestampedStore`. We now check for
the existing column families _before_ opening the database, which allows
us to create any missing column families.

Supporting automatic creation of any number of missing column families
is a pre-requisite for KIP-892: Transactional StateStores.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
2023-12-05 10:02:04 +01:00