Commit Graph

16067 Commits

Author SHA1 Message Date
Federico Valeri f2cbc7e3f2
MINOR: KafkaRaftClient cleanup (#20161)
Various minor cleanups for the KRaft client.

---------

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>

Reviewers: Luke Chen <showuon@gmail.com>
2025-07-17 10:41:28 +08:00
Gaurav Narula 7e9df7d03d
KAFKA-19505: allow mocking UnifiedLog#topicId in ReplicaManagerTest (#20167)
The mocked value for `UnifiedLog#topicId` was incorrectly set up which
caused test failure.

Reviewers: Luke Chen <showuon@gmail.com>, PoAn Yang <payang@apache.org>, Satish Duggana <satishd@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-17 10:40:00 +08:00
Tsung-Han Ho (Miles Ho) 70824be92a
KAFKA-19501 Update OpenJDK base image from buster to bullseye (#20165)
CI / build (push) Waiting to run Details
The changes update the OpenJDK base image from 17-buster to 17-bullseye:
- Updates tests/docker/Dockerfile to use openjdk:17-bullseye instead of
openjdk:17-buster
  - Updates tests/docker/ducker-ak script to use the new default image
- Updates documentation in tests/README.md with the new image name
examples

Reviewers: Federico Valeri <fedevaleri@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
2025-07-17 03:27:07 +08:00
Ming-Yen Chung e3276ae029
KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
CI / build (push) Waiting to run Details
* Coordinator starts with a smaller buffer, which can grow as needed.

* In freeCurrentBatch, release the appropriate buffer:
  * The Coordinator recycles the expanded buffer
(`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
`MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
isn't large enough.

  * There are two cases that buffer may exceeds `maxMessageSize`      1.
If there's a single record whose size exceeds `maxMessageSize` (which,
so far, is derived from `max.message.bytes`) and the write is in
`non-atomic` mode, it's still possible for the buffer to grow beyond
`maxMessageSize`. In this case, the Coordinator should revert to using a
smaller buffer afterward.      2. Coordinator do not recycles the buffer
that larger than `maxMessageSize`. If the user dynamically reduces
`maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
Coordinator should avoid recycling any buffer larger than
`maxMessageSize` so that Coordinator can allocate the smaller buffer in
the next round.

* Add tests to verify the above scenarios.

Reviewers: David Jacot <djacot@confluent.io>, Sean Quah
<squah@confluent.io>, Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-16 22:06:33 +08:00
Sanskar Jhajharia 65a9337739
MINOR: Add ShareFetch quota session verification test (#20164)
CI / build (push) Waiting to run Details
### Background
As part of KIP-932 implementation, ShareFetch requests need to properly
integrate with Kafka's quota system. This requires that ShareFetch
requests extract and pass the correct session information (Principal,
client address, client ID) to quota managers, ensuring consistent quota
enforcement between ShareFetch and traditional Fetch requests.

### Changes
This PR adds `testHandleShareFetchRequestQuotaTagsVerification()`,
`testHandleShareAcknowledgeRequestQuotaTagsVerification` and
`testHandleShareFetchWithAcknowledgementQuotaTagsVerification` to
`KafkaApisTest`, which provides verification of quota tag extraction and
session handling for ShareFetch and ShareAcknowledge requests.
   - Ensures ShareFetch/ShareAck requests are properly constructed with
the correct client ID, principal, client address, and API key
   - Verifies the request context contains the expected session
information
   - Uses `ArgumentCaptor` to capture the exact `Session` and
`RequestChannel.Request` objects passed to quota managers
   - Verifies both `quotas.fetch.maybeRecordAndGetThrottleTimeMs()` and
`quotas.request.maybeRecordAndGetThrottleTimeMs()` are called with
correct parameters as and when needed.
   - Validates that the captured `RequestChannel.Request` object
maintains the correct request context information
   - Ensures the client ID passed to quota managers matches the
test-defined value
   - Verifies that in case of Acks being piggybacked on the fetch
requests, the quotas are applied only once and not twice.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-07-16 09:56:01 +01:00
Chang-Chi Hsu 2658f25238
KAFKA-19305 Make ClientQuotaImage and TopicImage immutable (#19847)
- Updated `ClientQuotaImage` and `TopicImage` by using
`Collections.unmodifiableMap` or `ImmutableMap` to prevent accidental or
intentional mutations after construction.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-07-16 11:50:18 +08:00
Masahiro Mori daece61a50
MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) (#20131)
CI / build (push) Waiting to run Details
This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to https://github.com/apache/kafka/pull/19961.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
2025-07-15 10:07:01 -07:00
lucliu1108 7ea32a0e93
KAFKA-19459: List internal topics for the user (#20157)
CI / build (push) Waiting to run Details
For the Kafka Stream group commands, if delete topic requests fail due
to version mismatch, user will have to remove the topics manually by
first retrieving the relevant internal topics.

To assist the user, the internal topic names are now included as part of
the error message, so that the user could delete the internal topics
associated with this application directly.

Reviewers: TengYao Chi <frankvicky@apache.org>, Alieh Saeedi
<asaeedi@confluent.io>
2025-07-15 11:52:35 +08:00
Bill Bejeck dd82542493
KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166)
CI / build (push) Waiting to run Details
The `AdminClient` adds a telemetry reporter to the metrics reporters
list in the constructor.  The problem is that the reporter was already
added in the `createInternal` method.  In the `createInternal` method
call, the `clientTelemetryReporter` is added to a
`List<MetricReporters>` which is passed to the `Metrics` object, will
get closed when `Metrics.close()` is called.  But adding a reporter to
the reporters list in the constructor is not used by the `Metrics`
object and hence doesn't get closed, causing a memory leak.

All related tests pass after this change.

Reviewers: Apoorv Mittal <apoorvmittal10@apache.org>, Matthias J. Sax
 <matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>,
 Jhen-Yung Hsu <jhenyunghsu@gmail.com>
2025-07-14 20:19:16 -04:00
Matthias J. Sax ffcfc974d9
KAFKA-19842: Fix flaky KafkaStreamsTelemetryIntegrationTest (#20147)
The new "streams" protocol behaves slightly different to the "classic"
protocol, and thus we need to update the test to avoid race conditions.
In particular, the first call to `poll()` won't "block" and return after
task assignment completed if we need to create internal topics,  but
returns early without a task assignment, and only a consecutive
rebalance will assign tasks.

This implies, that KafkaStreams transits to RUNNING state even if the
group is still in NOT_READY state broker side, but this NOT_READY  state
is not reflected in the client side state machine.

Disabling the combination of "complex-topology + streams" for now,
until this difference in behavior of the client state machine is fixed.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-07-14 17:13:54 -07:00
Xuan-Zhang Gong c6cf5175f6
MINOR: Add a description document for batchLength (#20140)
Add documentation for Batch Format to explain the meaning of
batchLength.

This is the preview image after the change:


![image](https://github.com/user-attachments/assets/85023c48-64e6-4a33-898f-df84f6864e58)

Reviewers: Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-15 05:51:24 +08:00
Sanskar Jhajharia 9f092420f1
MINOR: Extend Quota Tests for ShareFetch requests (#20163)
### Summary
Extends RequestQuotaTest to include ShareFetch API quota testing,
ensuring compliance with KIP-932.

### Key Changes
- New test: testShareFetchUsesSameFetchSensor() - Verifies ShareFetch
and Fetch use the same FETCH quota sensor
- New test:
testResponseThrottleTimeWhenBothShareFetchAndRequestQuotasViolated() -
Tests ShareFetch throttling behaviour
- Request builder: Added ApiKeys.SHARE_FETCH case with proper ShareFetch
request construction
- Some minor cleanup wrt use of Collections

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-07-14 21:28:25 +01:00
Kevin Wu a64f5bf6ab
KAFKA-19254 Add generic feature level metrics (#20021)
This PR adds the following metrics for each of the supported production
features (`metadata.version`, `kraft.version`, `transaction.version`,
etc.):

`kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=X`

`kafka.server:type=node-metrics,name=maximum-supported-level,feature-name=X`

`kafka.server:type=node-metrics,name=minimum-supported-level,feature-name=X`

Reviewers: Josep Prat <josep.prat@aiven.io>, PoAn Yang
 <payang@apache.org>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Lan Ding
 <isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-15 04:27:04 +08:00
Lan Ding 6437135bc0
KAFKA-19451: fix flaky test RemoteIndexCacheTest.testCacheEntryIsDeletedOnRemoval() (#20085)
**Problem Description**
In the `RemoteIndexCache.cleanup()` method, the asynchronous invocation
of `index.deleteIfExists()` may cause a conflict. When the
`getIndexFileFromRemoteCacheDir()` method is executed, it utilizes
`Files.walk()` to traverse all files in the directory path. If
`index.deleteIfExists()` is triggered during this traversal, a
`NoSuchFileException` will be thrown.

**Solution**
To resolve this issue, ensure that `index.deleteIfExists()` has been
fully executed before invoking `getIndexFileFromRemoteCacheDir()`.

Reviewers: Jun Rao <junrao@gmail.com>
2025-07-14 12:01:50 -07:00
Rajani K a61a37f7dd
KAFKA-19452: Fix flaky test LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments (#20121)
CI / build (push) Waiting to run Details
The `testHWCheckpointWithFailuresMultipleLogSegments` test in
`LogRecoveryTest` was failing intermittently due to a race condition
during its failure simulation.

In successful runs, the follower broker would restart and rejoin the
In-Sync Replica (ISR) set before the old leader's failure was fully
processed. This allowed for a clean and timely leader election to the
now in-sync follower.

However, in the failing runs, the follower did not rejoin the ISR before
the leader election was triggered. With no replicas in the ISR and
unclean leader election disabled by default for the test, the controller
correctly refused to elect a new leader, causing the test to time out.

This commit fixes the flakiness by overriding the controller
configuration for this test to explicitly enable unclean leader
election. This allows the out-of-sync replica to be promoted to leader,
making the test deterministic and stable.

Reviewers: Jun Rao <junrao@gmail.com>
2025-07-14 09:42:00 -07:00
xijiu 873379873e
KAFKA-19435 Optimize `kafka-consumer-groups.sh` to return the offset info when some partitions without leaders (#20064)
1. Optimize the corresponding logic in the `ConsumerGroupCommand` by
first checking if a leader exists for the partition before invoking the
`admin.listOffsets`. Finally, concatenate the data and return
2. Add integration test, create a cluster with 3 brokers, then shutdown
a broker and observe whether the output meets the expectations

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
 <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-07-14 22:13:01 +08:00
Luke Chen e1ff387605
KAFKA-14915: Allow reading from remote storage for multiple partitions in one fetchRequest (#20045)
This PR enables reading remote storage for multiple partitions in one
fetchRequest. The main changes are:
1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and
other metadata now.
2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done,
either succeeded or failed.
3. In `ReplicaManager#fetchMessage`, we'll create one
`DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and
watch all of them.
4. Added tests

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, Satish Duggana <satishd@apache.org>
2025-07-14 19:42:08 +05:30
Lucas Brutschy 29cf97b9ad
KAFKA-19478 [2/N]: Remove task pairs (#20127)
CI / build (push) Waiting to run Details
Task pairs is an optimization that is enabled in the current sticky task
assignor.

The basic idea is that every time we add a task A to a client that has
tasks B, C, we add pairs (A, B) and (A, C) to a global collection of
pairs. When adding a standby task, we then prioritize creating standby
tasks that create new task pairs. If this does not work, we fall back to
the original behavior.

The complexity of this optimization is fairly significant, and its
usefulness is questionable, the HighAvailabilityAssignor does not seem
to have such an optimization, and the absence of this optimization does
not seem to have caused any problems that I know of. I could not find
any what this optimization is actually trying to achieve.

A side effect of it is that we will sometimes avoid “small loops”, such
as

        Node A: ActiveTask1, StandbyTask2 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask4                            Node D: ActiveTask4,
StandbyTask3

So a small loop like this, worst case losing two nodes will cause 2
tasks to go down, so the assignor is preferring

        Node A: ActiveTask1, StandbyTask4 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask2                            Node D: ActiveTask4,
StandbyTask3

Which is a “big loop” assignment, where worst-case losing two nodes will
cause at most 1 task to be unavailable. However, this optimization seems
fairly niche, and also the current implementation does not seem to
implement it in a direct form, but a more relaxed constraint which
usually, does not always avoid small loops. So it remains unclear
whether  this is really the intention behind the optimization. The
current unit  tests of the StickyTaskAssignor pass even after removing
the  optimization.

The pairs optimization has a worst-case quadratic space and time
complexity in the number of tasks, and make a lot of other optimizations
impossible, so I’d suggest we remove it. I don’t think, in its current
form, it is suitable to be implemented in a broker-side assignor. Note,
however, if we identify a useful effect of the code in the future, we
can work on finding an efficient algorithm that can bring the
optimization to our broker-side assignor.

This reduces the runtime of our worst case benchmark by 10x.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-07-14 09:13:51 +02:00
yangxuze b6fce13e3a
KAFKA-19460: Improve documentation of fetch.min.bytes and replica.fetch.min.bytes (#20111)
While walking through the source code I confirmed that the broker checks
`replica.fetch.min.bytes` exactly the same way it checks
`fetch.min.bytes`, so this patch updates the wording for both config
keys.

Co-authored-by: yangxuze <xuze_yang@163.com>

Reviewers: Luke Chen <showuon@gmail.com>
2025-07-14 12:57:15 +08:00
Apoorv Mittal 986322dc36
MINOR: Moving the rollback out of lock in share partition (#20153)
CI / build (push) Has been cancelled Details
Fixup PR Labels / fixup-pr-labels (needs-attention) (push) Has been cancelled Details
Fixup PR Labels / fixup-pr-labels (triage) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.7.2) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.8.1) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.9.1) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (4.0.0) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (latest) (push) Has been cancelled Details
Fixup PR Labels / needs-attention (push) Has been cancelled Details
Flaky Test Report / Flaky Test Report (push) Has been cancelled Details
Moving rollback out of lock, if persister returns a completed future for
write state then same data-plane-request-handler thread should not call
purgatory safeTryAndComplete while holding SharePartition's write lock.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
 <adixit@confluent.io>
2025-07-11 15:22:03 +01:00
Alieh Saeedi c058c134d2
KAFKA-19496: Deflake streams admin api describe test (#20154)
This fixes the flaky

`DescribeStreamsGroupTest.testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions()`,
which sometimes fails due to `ERROR stream-thread Missing source topics:
Source topics customInputTopic2 are missing`

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-07-11 15:48:24 +02:00
Luke Chen 2346c0e737
KAFKA-19495: update native image config for native images (#20150)
We failed the native image build and test workflow

[here](https://github.com/apache/kafka/actions/runs/16211393417/job/45772104969).
The failed messages are:
```
Exception in thread "main" java.lang.ExceptionInInitializerError at
org.apache.kafka.server.config.AbstractKafkaConfig.<clinit>(AbstractKafkaConfig.java:56)
at
java.base@21.0.2/java.lang.Class.ensureInitialized(DynamicHub.java:601)
at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:79) at
scala.Option.flatMap(Option.scala:283) at
kafka.tools.StorageTool$.execute(StorageTool.scala:79) at
kafka.tools.StorageTool$.main(StorageTool.scala:46) at
kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:57) at
kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala) at
java.base@21.0.2/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever for
configuration sasl.oauthbearer.jwt.retriever.class: Class
org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever could
not be found. at
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:778)
at
org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:1271)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:155)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:198)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:237)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:399)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:412)
at
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.<clinit>(BrokerSecurityConfigs.java:197)
... 9 more
```
After investigation, I found we have to update the native image configs
to support the new code change as described

[here](https://github.com/apache/kafka/blob/trunk/docker/native/README.md#native-image-reachability-metadata).
This PR fixes this issue and verified that the same workflow for native
image passed

[here](https://github.com/apache/kafka/actions/runs/16215454627/job/45783738496).

The PR for v4.1.0 is https://github.com/apache/kafka/pull/20151 .

Reviewers: TengYao Chi <frankvicky@apache.org>
2025-07-11 17:26:28 +08:00
Sanskar Jhajharia 27383970b6
MINOR: Cleanup Connect Module (1/n) (#19869)
CI / build (push) Waiting to run Details
Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

Sub modules targeted: api, basic-auth-extensions, file, json, mirror,
mirror-client

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-11 00:35:41 +08:00
Shivsundar R 56a3c6dde9
KAFKA-19485: Added check before sending acknowledgements on initial epoch. (#20135)
CI / build (push) Waiting to run Details
https://issues.apache.org/jira/browse/KAFKA-19485

**Bug :**
There is a bug in `ShareConsumeRequestManager` where we are adding
acknowledgements on initial `ShareSession` epoch even after checking for
it.
Added fix to only include acknowledgements in the request if we have to,

PR also adds the check at another point in the code where we could
potentially be sending such acknowledgements.  One of the cases could be
when metadata is refreshed with empty topic IDs after a broker restart.
This means leader information would not be available on the node.

- Consumer subscribed to a partition whose leader was node-0.
- Broker restart happens and node-0 is elected leader again. Broker
starts a new `ShareSession`.
- Background thread sends a fetch request with **non-zero** epoch.
- Broker responds with `SHARE_SESSION_NOT_FOUND`.
- Client updates session epoch to 0 once it receives this error.
- Client updates metadata but receives empty metadata response. (Leader
unavailable)
- Application thread processing the previous fetch, completes and sends
acks to piggyback on next fetch.
- Next fetch will send the piggyback acknowledgements on the fetch for
previously subscribed partitions resulting in error from broker
("`Acknowledge data present on initial epoch`"). (Currently we attempt
to send even if leader is unavailable).

**Fix** :  Add a check before sending out acknowledgments if we are on
an initial epoch.
Added unit test covering the above scenario.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-07-10 09:06:19 +01:00
Xiao Yang ded7df9707
MINOR: fix docker_release example (#19427)
CI / build (push) Waiting to run Details
Fix docker_release example.
Currently, the command doesn't display correctly

Reviewers: TengYao Chi <frankvicky@apache.org>, PoAn Yang
 <payang@apache.org>, Yung <yungyung7654321@gmail.com>, Ken Huang
 <s7133700@gmail.com>
2025-07-10 12:41:21 +08:00
Xuan-Zhang Gong 2f6ea81d0a
KAFKA-19488: Update the docs of "if-not-exists" (#20133)
"the action will only execute" is incorrect, as the admin still sends
the request. The "if-not-exists" flag is actually used to swallow the
exception

Reviewers: TengYao Chi <frankvicky@apache.org>, Nick Guo
<lansg0504@gmail.com>, Ken Huang <s7133700@gmail.com>
2025-07-10 10:26:06 +08:00
Jinhe Zhang c625b44d8c
MINOR: Throw exceptions if source topic is missing (#20123)
CI / build (push) Waiting to run Details
In the old protocol, Kafka Streams used to throw a
`MissingSourceTopicException` when a source topic is missing. In the new
protocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.

This change:
1. Throws an `MissingSourceTopicException` when source topic is missing
2. Adds unit tests
3. Modifies integration tests to fit both old and new protocols

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-07-09 21:19:12 +02:00
Andrew Schofield 7b8a594a22
MINOR: Tidy up in AlterShareGroupOffsetsHandler (#20130)
Minor tidying up in AlterShareGroupOffsetsHandler based on review
comment
https://github.com/apache/kafka/pull/20049#discussion_r2192904850.

Reviewers: Jimmy Wang <wangzhiwang611@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping
 Tsai <chia7712@gmail.com>
2025-07-10 01:24:13 +08:00
Chang-Chi Hsu 22698493e9
MINOR: Move partitions == 0 logic from waitForTopic to waitTopicDeletion (#20108)
## Changes

- The partitions == 0 branch has been moved from **waitForTopic** to
**waitTopicDeletion**.

## Reasons

- Clarify the responsibility of each helper method makes the test code
easier to reason by moving the partitions == 0 logic from
**waitForTopic** into a dedicated method **waitTopicDeletion**.

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
 <tjwu1217@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-10 01:10:20 +08:00
Jhen-Yung Hsu 007fe6e92a
KAFKA-19466 LogConcurrencyTest should close the log when the test completes (#20110)
- Fix testUncommittedDataNotConsumedFrequentSegmentRolls() and
testUncommittedDataNotConsumed(), which call createLog() but never close
the log when the tests complete.
- Move LogConcurrencyTest to the Storage module and rewrite it in Java.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-07-10 01:01:42 +08:00
Lucas Brutschy dabde76ebf
KAFKA-19477: Sticky Assignor JMH Benchmark (#20118)
CI / build (push) Waiting to run Details
The current assignor used in KIP-1071 is verbatim the assignor used on
the client-side. The assignor performance was not a big concern on the
client-side, and it seems some additional performance overhead has crept
in during the adaptation to the broker-side interfaces, so we expect it
to be too slow for groups of non-trivial size.

We base ourselves on the share-group parameters for these benchmarks:

 - Up to 1000 members      - Up to 100 topics      - Up to 100
partitions per topic

Note, however, that the parameters influencing the Streams assignment
are different and more complicated compared to regular consumer groups /
share consumer groups. The assignment logic is independent of the number
of topics, but depends on the number of subtopologies. A subtopology may
read from multiple topics. We simplify this relationship by assuming one
topic per subtopology Members may be part of the same process or
separate processes. We introduce a parameter membersPerProcess to tune
two extreme configurations (1, 50).

We define 50% of the subtopologies to be stateful. Stateful
subtopologies get standby replicas assigned, if enabled. For example, if
we have 100 subtopologies with 100 partitions, we get 10,000 active
tasks and 5,000 standby tasks. 

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-07-09 13:58:03 +02:00
José Armando García Sancio e42e01eec3
KAFKA-19184: Add documentation for upgrading the kraft version (#20071)
Update the documentation to describe how to upgrade the kraft feature
version from 0 to 1.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Alyssa Huang
<ahuang@confluent.io>
2025-07-09 11:20:47 +02:00
Ming-Yen Chung ff4d951027
KAFKA-17715 Remove argument force_use_zk_connection from kafka_acls_cmd_with_optional_security_settings (#19209)
The e2e tests currently cover version 2.1.0 and above. Thus, we can
remove `force_use_zk_connection` in
`kafka_acls_cmd_with_optional_security_settings`

In contrast, the `force_use_zk_connection` in
`kafka_topics_cmd_with_optional_security_settings` and
`kafka_configs_cmd_with_optional_security_settings` still needs to be
kept as `kafka-topics.sh` does not support `--bootstrap-server` in 2.1
and 2.2

e2e test result:
```
===========================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-07-02--001
run time:         200 minutes 28.399 seconds
tests run:        90
passed:           90
flaky:            0
failed:           0
ignored:          0
===========================================
```

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-09 17:07:56 +08:00
Sushant Mahajan 8aa5eae2f9
KAFKA-19457: Make share group init retry interval configurable. (#20104)
* While creating share group init requests  in
`GroupMetadataManager.shareGroupHeartbeat`,  we check for topics in
`initializing` state and if they are a certain amount of time old, we
issue retry requests for the same.
* The interval for considering initializing topics as old was based of
`offsetsCommitTimeoutMs` and was not configurable.
* In this PR, we remedy the situation by introducing a new config to
supply the value. The default is `30_000` which is a
heuristic based on the fact that the share coordinator `persister`
retries request with exponential backoff, with upper cap of `30_000`
seconds.
* Tests have been updated wherever applicable.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>
2025-07-09 09:52:58 +01:00
Abhinav Dixit e489682c45
KAFKA-19450: ShareConsumerPerformance does not handle exceptions from consumeMessagesForSingleShareConsumer (#20126)
### About
Within `ShareConsumerPerformance.java`, all the share consumers run with
within an executorService object and when we
perform `executorService.submit()`, we do not store this future and
exception would be recovered only when we do a future.get() in this
case. I believe this is a shortcoming
in `ShareConsumerPerformance.java` which needs to be improved.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-07-09 09:51:05 +01:00
Gaurav Narula 36b9bb94f1
KAFKA-19474 Move WARN log on log truncation below HWM (#20106)
CI / build (push) Waiting to run Details
#5608 introduced a regression where the check for `targetOffset <
log.highWatermark`
to emit a `WARN` log was made incorrectly after truncating the log.

This change moves the check for `targetOffset < log.highWatermark`  to
`UnifiedLog#truncateTo` and ensures we emit a `WARN` log on truncation
below  the replica's HWM by both the `ReplicaFetcherThread` and
`ReplicaAlterLogDirsThread`

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-07-09 09:55:02 +08:00
Jonah Hooper d86ba7f54a
KAFKA-18681: Created GetReplicaLogInfo RPCs (#19664)
CI / build (push) Waiting to run Details
Creates GetReplicaLogInfoRequest and GetReplicaLogInfoResponse RPCs
Information returned by these brokers will be used to aid
unclean-recovery by selecting longest logs.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Calvin Liu <caliu@confluent.io>, Colin P. McCabe <cmccabe@apache.org>, TaiJuWu <tjwu1217@gmail.com>
2025-07-08 10:41:01 -07:00
Masahiro Mori ea7b145860
KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files (#19961)
https://issues.apache.org/jira/browse/KAFKA-19390

The AbstractIndex.resize() method does not release the old memory map
for both index and time index files.  In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.

The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.

The same problem was reported in
[KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.

Reviewers: Jun Rao <junrao@gmail.com>
2025-07-08 09:15:32 -07:00
Alieh Saeedi db1c6f31a3
KAFKA-18288: Fix Streams CLI describe (#20099)
CI / build (push) Waiting to run Details
This PR includes the following fixes:

- Streams CLI used to list and return the description of the first group
which is a bug. With this fix, it returns the descriptions of the groups
specified by the `--group` or `all-groups`. Integration test are added
to verify the fix.
- `timeoutOption` is missing in describe groups. This fix adds and tests
it with short timeout.
- `DescribeStreamsGroupsHandler` used to return an empty group in `DEAD`
state when the group id was not found, but with this fix, it throws
`GroupIdNotFoundException`
2025-07-08 15:28:56 +02:00
Lucas Brutschy a88fd01e74
KAFKA-19478 [1/N]: Precompute values in ProcessState (#20120)
This is a very mechanical and obvious change that is making most
accessors in ProcessState constant time O(1), instead of linear time
O(n), by computing the collections and aggregations at insertion time,
instead of every time the value is accessed.

Since the accessors are used in deeply nested loops, this reduces the
runtime of our worst case benchmarks by ~14x.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-07-08 13:32:47 +02:00
Jhen-Yung Hsu dde0b8cd92
MINOR: Prevent unnecessary test runs - KAFKA-19042 follow-up (#20122)
CI / build (push) Waiting to run Details
PlaintextConsumerTest should extend AbstractConsumerTest instead
BaseConsumerTest. Otherwise, those tests will be executed on both
`clients-integration-tests` and `core` (see
https://github.com/apache/kafka/pull/20081/files#r2190749592).

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-07-08 07:42:15 +08:00
Mickael Maison a3ed705092
MINOR: Fix build warning in Streams (#20098)
CI / build (push) Waiting to run Details
When building Streams I get this warning:
```
> Task :streams:compileTestJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note:
<PATH>/kafka/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
uses unchecked or unsafe operations.
```

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-08 02:44:12 +08:00
Ken Huang a399852ced
KAFKA-19042 Move PlaintextConsumerTest to client-integration-tests module (#20081)
Use Java to rewrite PlaintextConsumerTest by new test infra and  move it
to client-integration-tests module.

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-08 01:41:59 +08:00
Ismael Juma 4b607616c7
KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
This fixes librdkafka older than the recently released 2.11.0 with
Kerberos authentication and Apache Kafka 4.x.

Even though this is a bug in librdkafka, a key goal of KIP-896 is not to
break the popular client libraries listed in it. Adding back JoinGroup
v0 & v1 is a very small change and worth it from that perspective.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-07-07 08:44:24 -07:00
Bolin Lin e8ee7fc210
KAFKA-19315 Move ControllerMutationQuotaManager to server module (#19807)
CI / build (push) Has been cancelled Details
Migrate ControllerMutationQuotaManager to Java implementation and move
to server module, including ClientQuotaManager and associated files.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-07 01:55:38 +08:00
Ken Huang d31885d33c
MINOR: Use <code> block instead of backtick (#20107)
CI / build (push) Waiting to run Details
When writing HTML, it's recommended to use the <code> element instead of
backticks for inline code formatting.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
2025-07-06 14:49:51 +08:00
Omnia Ibrahim 9df616da76
KAFKA-19397: Ensure consistent metadata usage in produce request and response (#19964)
CI / build (push) Has been cancelled Details
Flaky Test Report / Flaky Test Report (push) Has been cancelled Details
- Metadata doesn't have the full view of topicNames to ids during
rebootstrap of client or when topic has been deleted/recreated. The
solution is to pass down topic id and stop trying to figure it out later
in the logic.

---------

Co-authored-by: Kirk True <kirk@kirktrue.pro>
2025-07-04 17:44:09 +01:00
Andrew Schofield da4fbba279
KAFKA-19468: Ignore unsubscribed topics when computing share assignment (#20101)
When the group coordinator is processing a heartbeat from a share
consumer, it must decide whether the recompute the assignment. Part of
this decision hinges on whether the assigned partitions match the
partitions initialised by the share coordinator. However, when the set
of subscribed topics changes, there may be initialised partitions which
are not currently assigned. Topics which are not subscribed should be
omitted from the calculation about whether to recompute the assignment.

Co-authored-by: Sushant Mahajan <smahajan@confluent.io>

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Sushant
 Mahajan <smahajan@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>
2025-07-04 14:55:19 +01:00
Andrew Schofield 860853dba2
KAFKA-19363: Finalize heterogeneous simple share assignor (#20074)
CI / build (push) Waiting to run Details
Finalise the share group SimpleAssignor for heterogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-07-04 10:35:31 +01:00
Jhen-Yung Hsu 4e31e270ba
MINOR: Update the docs for Metrics (#20094)
CI / build (push) Waiting to run Details
Update the outdated Javadocs in Metrics.java. The `MetricName(String
name, String group)` constructor in MetricName.java was removed in

59b918ec2b
Minor typo fixes included.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-07-04 02:01:29 +08:00