Removing the isEligibleLeaderReplicasV1Enabled to let ELR be enabled if
MV is at least 4.1IV1. Also bump the Latest Prod MV to 4.1IV1
Reviewers: Jun Rao <junrao@gmail.com>
Use Java to rewrite ProducerSendWhileDeletionTest by new test infra and
move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The OffsetFetch API does not support top level errors in version 1.
Hence, the top level error must be returned at the partition level.
Side note: It is a tad annoying that we create error response in
multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
reason for this but I cannot remember.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Sean Quah <squah@confluent.io>, Ken Huang <s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Previously, we could wait for up to half of the broker session timeout
for an RPC to complete, and then delay by up to half of the broker
session timeout. When taken together, these two delays could lead to
brokers erroneously missing heartbeats.
This change removes exponential backoff for heartbeats sent from the
broker to the controller. The load caused by heartbeats is not heavy,
and controllers can easily time out heartbeats when the queue length is
too long. Additionally, we now set the maximum RPC time to the length of
the broker period. This minimizes the impact of heavy load.
Reviewers: José Armando García Sancio <jsancio@apache.org>, David Arthur <mumrah@gmail.com>
The PR cherrypicks below commit:
```
commit 1ca8779bee
Author: Apoorv Mittal <apoorvmittal10@gmail.com>
Date: Tue Jun 24 09:46:14 2025 +0100
MINOR: Correcting client error for fenced share partition (#20023)
Correct the error when SharePartition is fenced.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Sushant Mahajan
<smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
```
And selective changes from following commit, which are critical:
```
commit 3d4407ff9d
Author: Sushant Mahajan <smahajan@confluent.io>
Date: Mon Jun 23 23:57:15 2025 +0530
MINOR: Change exceptions for few error codes in SharePartition.
(#20020)
* The `SharePartition` class wraps the errors received from
`PersisterStateManager` to be sent to the client. * In this PR, we
are categorizing the errors a bit better. * Some exception messages
in `PersisterStateManager` have been updated to show the share
partition key. * Tests have been updated wherever needed.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
```
Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan
<smahajan@confluent.io>
Extending test coverage of authorization for streams group RPC
StreamsGroupDescribe. The RPC requires DESCRIBE GROUP and DESCRIBE TOPIC
permissions for all topics.
Reviewers: Bill Bejeck <bbejeck@apache.org>
Extending test coverage of authorization for streams group RPC
StreamsGroupHeartbeat. The RPC requires READ GROUP and DESCRIBE TOPIC
permissions for all topics. For creating internal topics, we require
CREATE TOPIC permission. If internal topic creation fails, the request
does not fail, but the status reflects this problem.
Reviewers: Bill Bejeck <bbejeck@apache.org>
This is a follow up to [https://github.com/apache/kafka/pull/19910](url)
`The coordinator failed to write an epoch fence transition for producer
jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The
epoch was increased to 2 but not returned to the client
(kafka.coordinator.transaction.TransactionCoordinator)` -- as we don't
bump the epoch with this change, we should also update the message to
not say "increased" and remove the
**epochAndMetadata.transactionMetadata.hasFailedEpochFence = true** line
In the test, the expected behavior is:
1) First append transaction to the log fails with
COORDINATOR_NOT_AVAILABLE (epoch 1)
2) We try init_pid again, this time the SINGLE epoch bump succeeds, and
the following things happen simultaneously (epoch 2)
-> Transition to COMPLETE_ABORT
-> Return CONCURRENT_TRANSACTION error to the client
3) The client retries, and there is another epoch bump; state
transitions to EMPTY (epoch 3)
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
Adds a test dependency on
[mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for
integration tests for OAuth layer. Also includes fixes for some
regressions that were caught by the integration tests.
Reviewers: Manikumar Reddy <manikumar@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>
When InitProducerId is handled on the transaction coordinator, the
producer epoch is incremented (so that we fence stale requests), then if
a transaction was ongoing during this time, it's aborted. With
transaction version 2 (a.k.a. KIP-890 part 2), abort increments the
producer epoch again (it's the part of the new abort / commit protocol),
so the epoch ends up incremented twice.
In most cases, this is benign, but in the case where the epoch of the
ongoing transaction is 32766, it's incremented to 32767, which is the
maximum value for short. Then, when it's incremented for the second
time, it goes negative, causing an illegal argument exception.
To fix this we just avoid bumping the epoch a second time.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
This PR uses topic IDs received in assignment (under new protocol) to
ensure that only these assigned topics are included in the consumer
metadata requests performed when the user subscribes to broker-side
regex (RE2J).
For handling the edge case of consumer needing metadata for topics IDs
(from RE2J) and topic names (from transient topics), the approach is to
send a request for the transient topics needed temporarily, and once
those resolved, the request for the topic IDs needed for RE2J will
follow. (this is because the broker doesn't accept requests for names
and IDs at the same time)
With the changes we also end up fixing another issue (KAFKA-18729) aimed
at avoiding iterating the full set of assigned partitions when checking
if a topic should be retained from the metadata response when using
RE2J.
Reviewers: David Jacot <djacot@confluent.io>
Consumers can subscribe to an RE2J SubscriptionPattern that will be
resolved and maintained on the server-side (KIP-848). Currently, those
regexes are refreshed on the coordinator when a consumer subscribes to a
new regex, or if there is a new topic metadata image (to ensure regex
resolution stays up-to-date with existing topics)
But with
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
topics matching a regex are filtered based on ACLs. This generates a new
situation, as regexes resolution do not stay up-to-date as topics become
visible (ACLs added/delete).
This patch introduces time-based refresh for the subscribed regex by
- Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
config
that controls the refresh interval.
- Schedule a regex refresh when updating regex subscription if the
latest refresh is older than the max interval.
Reviewers: David Jacot <djacot@confluent.io>
For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either watched keys but
then again the reference to the operation is still present in other
watched key. Which means the memory can only be free once purge
operation is triggered by DelayedOperationPurgatory which removes the
watched key operation from remaining keys, as the operation is already
completed.
The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent the broker.
This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.
#### Testing
Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc
[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.
Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
The original `props.setProperty(TopicConfig.SEGMENT_MS_CONFIG,
config.logSegmentMillis.toString)` in the `KafkaMetadataLog` constructor
was accidentally removed in #19371. Add a test to ensure this property
is properly assigned.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
See Discussion:
https://github.com/apache/kafka/pull/19371#discussion_r2109549343
Do the following changes:
- Update the internal config name with metadata prefix
- add the warning message for setting
`INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Use Java to rewrite PlaintextConsumerSubscriptionTest by new test infra
and move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
- Replace the deprecated `becomeLeaderOrFollower` with the
metadata-based `applyDelta` method.
- Add overloaded `topicsCreateDelta` to support custom topic name and
topicId.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Nick Guo <lansg0504@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Description:
* replace RPC with KRaft mechanism to test activeProducerState in
ReplicaManagerTest
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Move AddPartitionsToTxnManager to server module and convert to Java.
This patch moves AddPartitionsToTxnManager from the core module to the
server module, with its package updated from `kafka.server` to
`org.apache.kafka.server.transaction`. Additionally, several
configuration used by AddPartitionsToTxnManager are moved from
KafkaConfig.scala to AbstractKafkaConfig.java.
- brokerId
- requestTimeoutMs
- controllerListenerNames
- interBrokerListenerName
- interBrokerSecurityProtocol
- effectiveListenerSecurityProtocolMap
The next PR will move AddPartitionsToTxnManagerTest.scala to java
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This change handles rejecting non-zero sequences when there is an empty
producerIDState with TV2. The scenario will be covered with the
re-triable OutOfOrderSequence error.
For Transactions V2 with empty state: ✅ Allow only sequence 0 is allowed for
new producers or after state cleanup (new validation added) ❌ Don't allow any
non-zero sequence is rejected with our specific error message ❌ Don't allow any epoch
bumps still require sequence 0 (existing validation remains)
For Transactions V1 with empty state: ✅ Allow ANY sequence number is allowed
(0, 5, 100, etc.) ❌ Don't allow epoch bumps still require sequence 0 (existing
validation)
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
The mapKey optimisation can be used in some KIP-932 RPC schemas to
improve efficiency of some key-based accesses.
* AlterShareGroupOffsetsResponse
* ShareFetchRequest
* ShareFetchResponse
* ShareAcknowledgeRequest
* ShareAcknowledgeResponse
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only some Scala files
in the Core module's tests. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
To be clear, the directories being targeted in this PR from unit.kafka
module:
- log
- network
- security
- tools
- utils
Reviewers: TengYao Chi <frankvicky@apache.org>
The PR do following:
1. rewrite to new test infra
2. rewrite to java
3. move to clients-integration-tests
Reviewers: Ken Huang <s7133700@gmail.com>, Kuan-Po Tseng
<brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix to ensure protocol name comparison in integration test ignore case
(group protocol from param is lower case, vs enum name upper case)
The tests were not failing but the custom configs/expectation were not
being applied depending on the protocol (the tests checks for
"groupProtocol.equals(CLASSIC)" would never be true.
Found all comparisons with equals agains the constant name and fixed
them (not too many luckily).
I did consider changing the protocol param that is passed to every test
(that is now lowercase), but still, seems more robust to have the tests
ignore case.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
With the transaction V2, replica manager checks whether the incoming
producer request produces to a partition belonging to a transaction.
ReplicaManager figures this out by checking the producer epoch stored in
the partition log. However, the current code does not reject the produce
request if its producer epoch is lower than the stored producer epoch.
It is an optimization to reject such requests earlier instead of sending
an AddPartitionToTxn request and getting rejected in the response.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
Hi, I've created pull request.
jira: [19328](https://issues.apache.org/jira/browse/KAFKA-19328)
problem:
1. doAnswer chaining works as intended only when calls are made
sequentially. In a multithreaded environment, its behavior is
unpredictable.
2. errors in a thread can be swallowed, not seen in main thread.
3. 5 doAnswer chain is not enough for 100 threads. The last chain is
returned for most cases.
4. nextFetchOffset seems to be called before doAnswer chain, so the last
values (25, 5, 26, 16) always was found in doAsnwer chain.
solution:
Delete doAnswer chain so that above four problems disappear.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
### About
Replaced `.close` functionality with `try-with-resources` for few tests
in `DelayedShareFetchTest` where we required to use `mockStatic`.
### Testing
The code has been tested by running the unit tests.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Adding support for the `urn:ietf:params:oauth:grant-type:jwt-bearer`
grant type (AKA `jwt-bearer`). Includes further refactoring of the
existing OAuth layer and addition of generic JWT assertion layer that
can be leveraged in the future.
This constitutes the main piece of the JWT Bearer grant type support.
Forthcoming commits/PRs will include improvements for both the
`client_credentials` and `jwt-bearer` grant types in the following
areas:
* Integration test coverage (KAFKA-19153)
* Unit test coverage (KAFKA-19308)
* Top-level documentation (KAFKA-19152)
* Improvements to and documentation for `OAuthCompatibilityTool`
(KAFKA-19307)
Reviewers: Manikumar Reddy <manikumar@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>
---------
Co-authored-by: Zachary Hamilton <77027819+zacharydhamilton@users.noreply.github.com>
Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
- Currently, read and write share state requests were allowed on
uninitialized share partitions (share partitions on which
initializeState has NOT been called). This should not be the case.
- This PR addresses the concern by adding error checks on read and
write. Other requests are allowed (initialize, readSummary, alter).
- Refactored `ShareCoordinatorShardTest` to reduce redundancy and added
some new tests.
- Some request/response classes have also been reformatted.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Before 4.1, the api key 74 is `ListClientMetricsResources`. After 4.1,
it's `ListConfigResources`. If users sent a v0 ListConfigResources to
broker, the metric doesn't record request with
`ListClientMetricsResources`. This PR is to add
`ListClientMetricsResources` metric if the request is v0
`ListConfigResources`.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This refactor improves the implementation of `awaitNonEmptyRecords` by:
- Removing the unreachable `throw new IllegalStateException` statement,
which was dead code due to `pollRecordsUntilTrue` throwing exceptions on
timeout.
- Eliminating the use of `return` inside the lambda, which relies on
non-local returns that can be confusing and error-prone in Scala.
Reviewers: Yung <yungyung7654321@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
---------
Co-authored-by: Jing-Jia Hung <jing@Jing-JiadeiMac.local>
Following the removal of the ZK-to-KRaft migration code in commit
85bfdf4, controller-to-broker communication is now handled by the
control-plane listener (`controller.listener.names`). The
`interBrokerListenerName` parameter in `ClusterControlManager` is no
longer referenced on the controller side and can be safely removed as
dead code.
Reviewers: Lan Ding <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
Removed the unused FetchResponse#of that is not used in production. The
test cases that originally invoked this method have been updated to call
the other
[FetchResponse#of](6af849f864/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java (L232)),
which is currently used by ```KafkaApis```, to maintain the integrity of
the tests.
Reviewers: Jun Rao <junrao@gmail.com>, PoAn Yang <payang@apache.org>,
Chia-Ping Tsai <chia7712@gmail.com>
Use Java to rewrite BaseConsumerTest, SaslPlainPlaintextConsumerTest by
new test infra and move it to client-integration-tests module, the
BaseConsumerTest test is still used, thus we should not remove now.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<frankvicky@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
The commit 57ae6d6706 had mistakenly
removed the `@Test` tag from a test. Adding it back.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>
1. Move `LogReadResult` to server module.
2. Rewrite `LogReadResult` in Java.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, TengYao Chi <frankvicky@apache.org>
This PR rewrites `ConsumerWithLegacyMessageFormatIntegrationTest.scala`
in Java and moves it to the `clients-integration-tests module`.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
* Return resource doesn't exist message when users try to describe a
non-existent resource in kafka-configs.sh and kafka-client-metrics.sh.
* For groups type, the command checks both existent groups and
non-existent groups but having dynamic config. If it cannot find a group
in both conditions, return resource doesn't exist message.
Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Use ClusterTest and java to rewrite `EndToEndClusterIdTest` and move it
to the server module
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Add integration tests for alter share group offsets API.
Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Sushant
Mahajan <smahajan@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
### About
Added test memory records to avoid the silent exception thrown during
slicing.
### Testing
Ran the tests of `DelayedShareFetchTest` to make sure that there is no
silent exception in any test.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
[KIP-1143](https://cwiki.apache.org/confluence/x/LwqWF) replaced
`Optional<String> listenerName()` with `String listener()`.
This patch migrates the usage from the deprecated method to the new one.
Reviewers: Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>, Yung <yungyung7654321@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only the Scala files
in the Core module's tests. The unit tests module is still pending. It
shall follow next. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
To be clear, the directories being targeted in this PR are:
KafkaApisTest.scala
Reviewers: PoAn Yang <payang@apache.org>, Ming-Yen Chung
<mingyen066@gmail.com>, Yung <yungyung7654321@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>
This patch updates the OffsetFetch API to ensure that a committed offset
is returned iff the requested topic id matches the persisted one; the
invalid offset is returned otherwise.
Reviewers: Lianet Magrans <lmagrans@confluent.io>