Use Java to rewrite PlaintextConsumerPollTest by new test infra and move
it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
- Remove redundant close of `snapshotWriter`.
- `snapshotWriter` is already closed by `RaftSnapshotWriter#writer`.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix the change in
https://github.com/apache/kafka/pull/19380/files#r2102980872
Use `LinkedHashMap` instead of `Set` to preserve the order of elements.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Sanskar Jhajharia
<sjhajharia@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
* Add new functions `listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options)` and
`listConfigResources()` to `Admin` interface.
* New functions can list all kind of config resource types.
* If input is a set with a type other than `CLIENT_METRICS` and
request version is 0, return `UnsupportedVersionException`.
* Deprecate functions
`listClientMetricsResources(ListClientMetricsResourcesOptions options)`
and `listClientMetricsResources()`.
* Deprecate classes `ListClientMetricsResourcesResult` and
`ClientMetricsResourceListing`.
* Change `ClientMetricsCommand` to use `listConfigResources`.
* Add integration tests to `PlaintextAdminIntegrationTest.java`.
* Add unit tests to `KafkaAdminClientTest.java`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
### About
Consumer groups have a different timeout `REMOTE_FETCH_MAX_WAIT_MS_PROP`
in delayed fetch purgatory for fetch requests having remote storage
fetch ([code
link](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1669)).
This is done before the request enters the purgatory, so its easy to
change. At the moment share groups can only have a `waitTimeMs` `of
shareFetch.fetchParams().maxWaitMs` (default value `500ms`) for delayed
share fetch purgatory regardless of whether they are remote
storage/local log fetch.
This PR introduces a way to increase the timeout of remote storage fetch
requests if a remote storage fetch request couldn't complete within
`shareFetch.fetchParams().maxWaitMs`, then we create a timer task which
can be interrupted whenever `pendingFetches` is finished. The change has
been done to avoid the expiration of remote storage share fetch
requests.
### Testing
The code has been tested with the help of unit tests and
`LocalTieredStorage.java`
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Unit tests added for the class GraphNode.
Change applied to GraphNode parentNodes() function to return a copy of
the collection, which is consistent with how the children() collection
is returned.
Reviewers: Bill Bejeck <bbejeck@apache.org>
---------
Co-authored-by: Lorcan <lorcanjames1@gmail.com>
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only the Scala files
in the Core module's tests. The unit tests module is still pending. It
shall follow next. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
To be clear, the directories being targeted in this PR are:
- core/src/test/scala/kafka
- core/src/test/scala/integration/kafka
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Gradle 8.14 starts to support Java 24, so we should update the Gradle
version accordingly.
Reviewers: TengYao Chi <kitingiao@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Now that Kafka supports Java 17, this PR updates the shell module to get
rid of older code. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
- Arrays.asList() are replaced with List.of()
- Replace switch statement with switch expression
Reviewers: PoAn Yang <payang@apache.org>, Yung
<yungyung7654321@gmail.com>, Ken Huang <s7133700@gmail.com>, TengYao Chi
<frankvicky@apache.org>
The test incorrectly used force-terminate instead of
forceTerminateTransaction
Reviewers: Justine Olshan <jolshan@confuent.io>, Kuan-Po Tseng
<brandboat@gmail.com>, Ken Huang <s7133700@gmail.com>
### Issue:
API Responses missing latest version in [Kafka protocol
guide](https://kafka.apache.org/protocol.html)
#### For example:
These are missing:
- ApiVersions Response (Version: 4) — Only versions 0–3 are documented,
though version 4 of the request is included.
- DescribeTopicPartitions Response — Not listed at all.
- Fetch Response (Version: 17) — Only versions 4–16 are documented,
though version 17 of the request is included.
#### After the fix:
docs/generated/protocol_messages.html
<img width="1045" alt="image"
src="https://github.com/user-attachments/assets/5ea79ced-aab5-4c47-8e09-9956047c9bf1"
/>
Reviewers: dengziming <dengziming1993@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Now that Kafka Brokers support Java 17, this PR updates the share
coordinator module to get rid of older code. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
- Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Reviewers: Andrew Schofield <aschofield@confluent.io>
Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Replace `TopicMetadata` with `MetadataImage` in
`SubscribedTopicDescriberImpl` and `TargetAssignmentBuilder`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<djacot@confluent.io>
This PR is based on the discussion here:
https://github.com/apache/kafka/pull/18929#discussion_r2083238645
Currently, the handle methods related to `shared‐group` requests are
inconsistent: some return `Unit` while others return
`CompletableFuture[Unit]` without a clear rationale. This PR
standardizes all of them to return `CompletableFuture[Unit]` and ensures
consistent error handling by chaining `.exceptionally(handleError)` to
each call site.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Enable next system test with KIP-1071.
Some of the validation inside the test did not make sense for KIP-1071.
This is because in KIP-1071, if a member leaves or joins the group, not
all members may enter a REBALANCING state. We use the wrapper introduced
in [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271)
to print a log line whenever the member epoch is bumped, which is the
only way a member can "indirectly" observe that other members are
rebalancing.
Reviewers: Bill Bejeck <bill@confluent.io>
In FETCH requests and TXN_OFFSET_COMMIT requests, on current trunk we
run into a race condition inside UnifiedLog, causing a
`NoSuchElementException` in
`UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:651)`.
The cause is that the line a performing an `isPresent` check on a
volatile Optional before accessing it in `get`, leaving the door open to
a race condition when the optional changes between `isPresent` and
`get`. This change takes a copy of the volatile variable first.
This PR adds changes, so the IQ endpoint information is only sent to
streams group members when there has been a change in the assignments
requiring an update in the streams client host-partition ownership.
The existing IQ integration test passes with no modifications and
updated the `GroupMetadataManagerTest` to cover the new process.
Reviewers: Matthias Sax <mjsax@apache.org>, Lucas Brutschy
<lucasbru@apache.org>
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR includes the system test file test_console_share_consumer.py
which tests the functioning of ConsoleShareConsumer
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch adds the `TopicId` field to the `OffsetCommitValue` record as
a tagged field. It will be later used on the offset fetch path to ensure
that the persisted offset matches the requested one.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Sean Quah
<squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
According to KIP-830, in Kafka 4 users currently relying on JmxReporter
and that are using additional reporters via metric.reporters will have
to include org.apache.kafka.common.metrics.JmxReporter in
metric.reporters.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
With KIP-1071 enabled, the main consumer is created differently, side
stepping `KafkaClientSupplier`.
To allow injection test wrappers, we add an internal ConsumerWrapper,
until we define a new public interface.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Migrate MinInSyncReplicasConfigTest to the server module
Reviewers: PoAn Yang <payang@apache.org>, Yung
<yungyung7654321@gmail.com>, TengYao Chi <frankvicky@apache.org>, Ken
Huang <s7133700@gmail.com>
This PR fixes a regression bug introduced with KAFKA-17203. We need to
pass in mutable collections into `closeTaskClean(...)`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Updates the min version used by `ListOffsetsRequest` to
`ApiKeys.LIST_OFFSETS.oldestVersion()` rather than hardcoding `1`.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Recent changes to BrokerMetadataPublisher seem to have caused a mismatch
between the BrokerMetadataPublisher and its test.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>
### About
Minor improvements to Persister APIs in KafkaApis with respect to share
group protocol enablement
### Testing
The new code added has been tested with the help of unit tests
Reviewers: Sushant Mahajan <sushant.mahajan88@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:
```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```
This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Adding test to specifically force the fencing path due to delayed
rebalance, and validate how the consumer recovers automatically.
Running this test and DEBUG log enabled, allows to see the details of
the fencing flow: consumer getting fenced due to rebalance exceeded,
resetting to epoch 0, rejoining on the next poll with the existing
subscription, and being accepted back in the group (so consumption
resumes)
This is aimed to help understand
[KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233)
Will add another one in separate PR to also involve commits in similar
fencing scenarios.
Reviewers: TengYao Chi <frankvicky@apache.org>
This fixes the paragraph splitting done in the pr-format.py script. If a
line contains a triple backtick, the script will consider it markdown
and adjust the formatting accordingly.
Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
* Previously we had added code to `GroupCoordinatorService.onElection`
to reconcile pending share group initializing topics. This was done to
manage state in case of failovers and broker failures.
* However, we later modified share group heartbeat code to do the retry
to clean up the state and the `onElection` code is now redundant.
* In this PR we are cleaning up this code.
Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
According to the recent changes in KIP-932, when the share session cache
is full and a broker receives a Share Fetch request with Initial Share
Session Epoch (0), then the error code `SHARE_SESSION_LIMIT_REACHED` is
returned after a delay of maxWaitMs. This PR implements this logic. In
order to add a delay between subsequent share fetch requests, the timer
is delayed operation purgatory is used. A new `IdleShareFetchTimeTask`
has been added which takes in a CompletableFuture<Void>. Upon the
expiration, this future is completed with null. When the future is
completes, a response is sent back to the client with the error code
`SHARE_SESSION_LIMIT_REACHED`
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
* We have a few periodic timer tasks in `ShareCoordinatorService` which
run continuously.
* With the recent introduction of share group enabled config at feature
level, we would like these jobs to stop when the aforementioned feature
is disabled.
* In this PR, we have added the functionality to make that possible.
* Additionally the service has been supplemented with addition of a
static share group config supplier.
* New test has been added.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
If delete.topic.enable is false on the brokers, deleteTopics will mark
the topics for deletion, but not actually delete them. The futures will
return successfully in this case.
It is not true as the server return exception now.
```java if (!config.deleteTopicEnable) { if (apiVersion < 3) { throw
new InvalidRequestException("Topic deletion is disabled.") } else {
throw new TopicDeletionDisabledException() } } ```
Reviewers: Nick Guo <lansg0504@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The PR is a minor follow up on
https://github.com/apache/kafka/pull/19659.
KafkaApis.scala already have a check which denies new share fetch
related calls if the share group is not supported. Hence no new sessions
shall be created so the requirement to have share group enabled flag in
ShareSessionCache is not needed, unless I am missing something.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>