This PR fixed the time comparison logic in
`CoordinatorRuntime#maybeFlushCurrentBatch` to ensure that the batch is
flushed when the elapsed time since `appendTimeMs` exceeds the
`appendLingerMs` parameter.
This issue is also mentioned [here](
https://github.com/apache/kafka/pull/20653/files#r2442452104).
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
…ugins if it has the correct version
Reviewers: Greg Harris <gharris1727@gmail.com>, Fiore Mario Vitale <mvitale86@gmail.com>, Snehashis Pal <snehashisp1995@gmail.com>
Kafka Streams does not catch Error types that occur during
`GlobalStreamThread` initiation, and therefore it is not possible to
trace the error (for example, an `ExceptionInInitializerError` occurs
when RocksDB is not found for a global store). This is because errors
are not caught and logged.
The catch block in `GlobalStreamThread#initialize()` has been ensured to
catch `Throwable` instead of `Exception`. Additionally, the empty
`setUncaughtHandler` set operation that prevented this from taking
effect when users employed setUncaughtExceptionHandler has been removed.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.
This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.
Also added tests to ensure this change does not impact desired behavior
for large compressible records.
Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
Before we added caching for consumer next offsets we'd called
`mainConsumer.position` and always expected something back. When we
added the caching, we kept the check that we always have nextOffset, but
as the logic changed to fetching the offsets from poll, we may not have
anything for topics that have no messages. This PR accounts for that.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
updates the Javadoc for
`org.apache.kafka.clients.consumer.OffsetResetStrategy` to clearly state
that it is deprecated for removal with no replacement.
Reviewers: Matthias J. Sax <matthias@confluent.io>
*What*
https://issues.apache.org/jira/browse/KAFKA-19485
**Bug :**
There is a bug in `ShareConsumeRequestManager` where we are adding
acknowledgements on initial `ShareSession` epoch even after checking for
it.
Added fix to only include acknowledgements in the request if we have to,
PR also adds the check at another point in the code where we could
potentially be sending such acknowledgements. One of the cases could be
when metadata is refreshed with empty topic IDs after a broker restart.
This means leader information would not be available on the node.
- Consumer subscribed to a partition whose leader was node-0.
- Broker restart happens and node-0 is elected leader again. Broker
starts a new `ShareSession`.
- Background thread sends a fetch request with **non-zero** epoch.
- Broker responds with `SHARE_SESSION_NOT_FOUND`.
- Client updates session epoch to 0 once it receives this error.
- Client updates metadata but receives empty metadata response. (Leader
unavailable)
- Application thread processing the previous fetch, completes and sends
acks to piggyback on next fetch.
- Next fetch will send the piggyback acknowledgements on the fetch for
previously subscribed partitions resulting in error from broker
("`Acknowledge data present on initial epoch`"). (Currently we attempt
to send even if leader is unavailable).
**Fix** : Add a check before sending out acknowledgments if we are on
an initial epoch.
Added unit test covering the above scenario.
Reviewers: Andrew Schofield <aschofield@confluent.io>
When a failure occurs with a push telemetry request, any exception is
treated as fatal, increasing the time interval to `Integer.MAX_VALUE`
effectively turning telemetry off. This PR updates the error handling
to check if the exception is a transient one with expected recovery and
keeps the telemetry interval value the same in those cases since a
recovery is expected.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Matthias
Sax<mjsax@apache.org>
Clear pendingTasksToInit on tasks clear. It matters in situations when
we shutting down a thread in PARTITIONS_ASSIGNED state. In this case we
may have locked some unassigned task directories (see
TaskManager#tryToLockAllNonEmptyTaskDirectories). Then we may have
gotten assigned to one or multiple of those tasks. In this scenario, we
will not release the locks for the unassigned task directories (see
TaskManager#releaseLockedUnassignedTaskDirectories), because
TaskManager#allTasks includes pendingTasksToInit, but it hasn't been
cleared.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy
<lbrutschy@confluent.io>
StreamsRebalanceListenerInvoker was implemented to match the behavior of
ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a
subtly different interface than ConsumerRebalanceListener - it does not
throw exceptions, but returns it as an Optional.
In the interest of consistency, this change fixes this mismatch by
changing the StreamsRebalanceListener interface to behave more like the
ConsumerRebalanceListener - throwing exceptions directly.
In another minor fix, the StreamsRebalanceListenerInvoker is changed to
simply skip callback execution instead of throwing an
IllegalStateException when no streamRebalanceListener is defined. This
can happen when the consumer is closed before Consumer.subscribe is
called.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
Backport #20629 to 4.1 branch
Simplify the last known elr update logic. This way can make a more
robust logic.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Moving rollback out of lock, if persister returns a completed future for
write state then same data-plane-request-handler thread should not call
purgatory safeTryAndComplete while holding SharePartition's write lock.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
<adixit@confluent.io>
In the consumer, we invoke the consumer rebalance onPartitionRevoked or
onPartitionLost callbacks, when the consumer closes. The point is that
the application may want to commit, or wipe the state if we are closing
unsuccessfully.
In the StreamsRebalanceListener, we did not implement this behavior,
which means when closing the consumer we may lose some progress, and in
the worst case also miss that we have to wipe our local state state
since we got fenced.
In this PR we implement StreamsRebalanceListenerInvoker, very similarly
to ConsumerRebalanceListenerInvoker and invoke it in Consumer.close.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>, TengYao Chi <frankvicky@apache.org>,
Uladzislau Blok <123193120+UladzislauBlok@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Cherry-pick commit from
https://github.com/apache/kafka/commit/96ef1c520a
In the stress testing it was noticed that on acquisition lock timeout,
some offsets were not found in the cache. The cache can be tried to be
updated in different acknowledgement calls hence if there is an ongoing
transition which is not yet finished but another parallel
acknowledgement triggers the cache update then the cache can be updated
incorrectly, while first transition is not yet finished.
Though the cache update happens for Archived and Acknowldeged records
hence this issue or existing implementation should not hamper the queues
functionality. But it might update the cache early when persister call
might fail or this issue triggers error logs with offset not found in
cache when acquisition lock timeouts (in some scenarios).
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
This PR fixes a leak in StreamsMetricImpl not removing a
store-level-metric correctly, and thus leaking objects.
Reviewers: Eduwer Camacaro <eduwerc@gmail.com>, Bill Bejeck
<bbejeck@apache.org>
```
commit ec37eb538b (HEAD ->
KAFKA-19719-cherry-pick-41, origin/KAFKA-19719-cherry-pick-41)
Author: Kevin Wu <kevin.wu2412@gmail.com>
Date: Thu Sep 25 11:56:16 2025 -0500
KAFKA-19719: --no-initial-controllers should not assume
kraft.version=1 (#20551)
Just because a controller node sets --no-initial-controllers flag
does not mean it is necessarily running kraft.version=1. The more
precise meaning is that the controller node being formatted does not
know what kraft version the cluster should be in, and therefore it
is only safe to assume kraft.version=0. Only by setting
--standalone,--initial-controllers, or --no-initial-controllers AND
not specifying the controller.quorum.voters static config, is it
known kraft.version > 0.
For example, it is a valid configuration (although confusing) to run
a static quorum defined by controller.quorum.voters but have all
the controllers format with --no-initial-controllers. In this
case, specifying --no-initial-controllers alongside a metadata
version that does not support kraft.version=1 causes formatting to
fail, which is does not support kraft.version=1 causes formatting
to fail, which is a regression.
Additionally, the formatter should not check the kraft.version
against the release version, since kraft.version does not actually
depend on any release version. It should only check the
kraft.version against the static voters config/format arguments.
This PR also cleans up the integration test framework to match the
semantics of formatting an actual cluster.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Kuan-Po Tseng
<brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, José Armando
García Sancio <jsancio@apache.org> Conflicts:
core/src/main/scala/kafka/tools/StorageTool.scala Minor conflicts. Keep
changes from cherry-pick.
core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
Remove auto-join tests, since 4.1 does not support it. docs/ops.html
Keep docs section from cherry-pick.
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
Minor conflicts. Keep cherry-picked changes.
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
Conflicts due to integration test framework changes. Keep new changes.
commit 02d58b176c (upstream/4.1)
```
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to https://github.com/apache/kafka/pull/19961.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-19390
The AbstractIndex.resize() method does not release the old memory map
for both index and time index files. In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.
The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.
The same problem was reported in
[KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.
Reviewers: Jun Rao <junrao@gmail.com>
- Fix typo in `process.role`
- Fix formatting of quorum description commands
Reviewers: Lan Ding <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>,
TengYao Chi <frankvicky@apache.org>
Update the KRaft dynamic voter set documentation. In Kafka 4.1, we
introduced a powerful new feature that enables seamless migration from a
static voter set to a dynamic voter set.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
Cherry pick of d067c6c040.
Reviewers: David Jacot <djacot@confluent.io>
Cherry-pick KAFKA-19546 to 4.1.
During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.
In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.
Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
Fix to avoid flakiness in verifiable producer system test. The test
lists running processes and greps to find the VerifiableProducer one,
but wasn't providing an specific pattern to grep (so flaky if there were
more than one process containing the default grep pattern "kafka")
Fix by passing a "proc_grep_string" to filter when looking for the
VerifiableProducer process.
All test pass successfully after the change.
Reviewers: PoAn Yang <payang@apache.org>, Andrew Schofield
<aschofield@confluent.io>
We need to only pass in the reset strategy, as the `logMessage`
parameter was removed.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lucas Brutschy
<lbrutschy@confluent.io>
Querying the oldest-open-iterator metric can result in a
NoSuchElementException when the last open iterator gets removed, due to
a race condition between the query and the metric update.
To avoid this race condition, this PR caches the metric result, to avoid
accessing the list of open iterator directly. We don't need to clear
this cache, because the entire metric is removed when the last iterator
gets removed.
Reviewers: Matthias J. Sax <matthias@confluent.io>
With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.
However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.
The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy
<lbrutschy@confluent.io>
It looks like we are checking for properties that are not guaranteed
under at_least_once, for example, exact counting (not allowing for
overcounting).
This change relaxes the validation constraint:
The TAGG topic contains effectively count-by-count results. So for
example, if we have the input without duplication
0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values.
with duplication:
0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values.
This makes the result difficult to compare. Since we run the smoke test
also with Exactly_Once, I propose to disable validation off TAGG under
ALOS.
Similarly, the topic AVG may overcount or undercount. The test case is
extremely similar to DIF, both performing a join and two streams, the
only difference being the mathematical operation performed, so we can
also disable this validation under ALOS with minimal loss of coverage.
Finally, the change fixes a bug that would throw a NPE when validation
of a windowed stream would fail.
Reviewers: Kirk True <kirk@kirktrue.pro>, Matthias J. Sax
<matthias@confluent.io>
The previous URL http://lambda-architecture.net/ seems to now be controlled by spammers
Co-authored-by: Shashank <hsshashank.grad@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>