Move `LoggingController` to server module and rewrite it in java.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
[KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets RPC.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Co-authored-by: jimmy <wangzhiwang@qq.com>
Use Java to rewrite PlaintextConsumerPollTest by new test infra and move
it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix the change in
https://github.com/apache/kafka/pull/19380/files#r2102980872
Use `LinkedHashMap` instead of `Set` to preserve the order of elements.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Sanskar Jhajharia
<sjhajharia@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
* Add new functions `listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options)` and
`listConfigResources()` to `Admin` interface.
* New functions can list all kind of config resource types.
* If input is a set with a type other than `CLIENT_METRICS` and
request version is 0, return `UnsupportedVersionException`.
* Deprecate functions
`listClientMetricsResources(ListClientMetricsResourcesOptions options)`
and `listClientMetricsResources()`.
* Deprecate classes `ListClientMetricsResourcesResult` and
`ClientMetricsResourceListing`.
* Change `ClientMetricsCommand` to use `listConfigResources`.
* Add integration tests to `PlaintextAdminIntegrationTest.java`.
* Add unit tests to `KafkaAdminClientTest.java`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
### About
Consumer groups have a different timeout `REMOTE_FETCH_MAX_WAIT_MS_PROP`
in delayed fetch purgatory for fetch requests having remote storage
fetch ([code
link](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1669)).
This is done before the request enters the purgatory, so its easy to
change. At the moment share groups can only have a `waitTimeMs` `of
shareFetch.fetchParams().maxWaitMs` (default value `500ms`) for delayed
share fetch purgatory regardless of whether they are remote
storage/local log fetch.
This PR introduces a way to increase the timeout of remote storage fetch
requests if a remote storage fetch request couldn't complete within
`shareFetch.fetchParams().maxWaitMs`, then we create a timer task which
can be interrupted whenever `pendingFetches` is finished. The change has
been done to avoid the expiration of remote storage share fetch
requests.
### Testing
The code has been tested with the help of unit tests and
`LocalTieredStorage.java`
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only the Scala files
in the Core module's tests. The unit tests module is still pending. It
shall follow next. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
To be clear, the directories being targeted in this PR are:
- core/src/test/scala/kafka
- core/src/test/scala/integration/kafka
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR is based on the discussion here:
https://github.com/apache/kafka/pull/18929#discussion_r2083238645
Currently, the handle methods related to `shared‐group` requests are
inconsistent: some return `Unit` while others return
`CompletableFuture[Unit]` without a clear rationale. This PR
standardizes all of them to return `CompletableFuture[Unit]` and ensures
consistent error handling by chaining `.exceptionally(handleError)` to
each call site.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Migrate MinInSyncReplicasConfigTest to the server module
Reviewers: PoAn Yang <payang@apache.org>, Yung
<yungyung7654321@gmail.com>, TengYao Chi <frankvicky@apache.org>, Ken
Huang <s7133700@gmail.com>
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Recent changes to BrokerMetadataPublisher seem to have caused a mismatch
between the BrokerMetadataPublisher and its test.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>
### About
Minor improvements to Persister APIs in KafkaApis with respect to share
group protocol enablement
### Testing
The new code added has been tested with the help of unit tests
Reviewers: Sushant Mahajan <sushant.mahajan88@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:
```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```
This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Adding test to specifically force the fencing path due to delayed
rebalance, and validate how the consumer recovers automatically.
Running this test and DEBUG log enabled, allows to see the details of
the fencing flow: consumer getting fenced due to rebalance exceeded,
resetting to epoch 0, rejoining on the next poll with the existing
subscription, and being accepted back in the group (so consumption
resumes)
This is aimed to help understand
[KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233)
Will add another one in separate PR to also involve commits in similar
fencing scenarios.
Reviewers: TengYao Chi <frankvicky@apache.org>
According to the recent changes in KIP-932, when the share session cache
is full and a broker receives a Share Fetch request with Initial Share
Session Epoch (0), then the error code `SHARE_SESSION_LIMIT_REACHED` is
returned after a delay of maxWaitMs. This PR implements this logic. In
order to add a delay between subsequent share fetch requests, the timer
is delayed operation purgatory is used. A new `IdleShareFetchTimeTask`
has been added which takes in a CompletableFuture<Void>. Upon the
expiration, this future is completed with null. When the future is
completes, a response is sent back to the client with the error code
`SHARE_SESSION_LIMIT_REACHED`
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
* We have a few periodic timer tasks in `ShareCoordinatorService` which
run continuously.
* With the recent introduction of share group enabled config at feature
level, we would like these jobs to stop when the aforementioned feature
is disabled.
* In this PR, we have added the functionality to make that possible.
* Additionally the service has been supplemented with addition of a
static share group config supplier.
* New test has been added.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
The PR is a minor follow up on
https://github.com/apache/kafka/pull/19659.
KafkaApis.scala already have a check which denies new share fetch
related calls if the share group is not supported. Hence no new sessions
shall be created so the requirement to have share group enabled flag in
ShareSessionCache is not needed, unless I am missing something.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Do not print the `"Unexpected error .."` log when the config
`delete.topic.enable` is false
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
In KRaft mode, controllerListenerName must always be specified, so we don't need an `Optional` to wrap it.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
This PR introduces the following per-broker metrics:
-`kafka.controller:type=KafkaController,name=BrokerRegistrationState,broker=X`
-`kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=X`
and this metric:
`kafka.controller:type=KafkaController,name=ControlledShutdownBrokerCount`
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.
Co-authored-by: John Doe <zh2725284321@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <yungyung7654321@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>
Add a warning message when using log.cleaner.enable to remind users that
this configuration is deprecated. Also, add a warning message for
log.cleaner.threads=0 because in version 5.0, the value must be greater
than zero.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
We updated the validation rule for cleanup.policy in remote storage
mode.
If remote log storage is enabled, only cleanup.policy=delete is allowed.
Any other value (e.g. compact, compact,delete) will now result in a
config validation error.
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
The PR adds ShareGroupListener which triggers on group changes, such as
when member leaves or group is empty. Such events are specific to
connection on respective broker. There events help to clean specific
states managed for respective member or group in various caches.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Move LeaderEndPoint to Server module
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
In Java return types are covariant. This means that method override can
override the return type with a subclass.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai
<chia7712@apache.org>, Apoorv Mittal <apoorvmittal10@gmail.com>
This PR creates a listener for `SharePartitionManager` to listen to any
changes in `ShareVersion` feature. In case, there is a toggle, we need
to change the attributes in `SharePartitionManager` accordingly.
Reviewers: Andrew Schofield <aschofield@confluent.io>
* Currently even if a user topic is deleted, its related records are not
deleted with respect to subscribed share groups from the GC and the SC.
* The event of topic delete is propagated from the
BrokerMetadataPublisher to the coordinators via the
`onPartitionsDeleted` method. This PR leverages this method to issue
cleanup calls to the GC and SC respectively.
* To prevent chaining of futures in the GC, we issue async calls to both
GC and SC independently and the methods take care of the respective
cleanups unaware of the other.
* This method is more efficient and transcends issues related to
timeouts/broker restarts resulting in chained future execution issues.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch extends the OffsetFetch API to support topic ids. From
version 10 of the API, topic ids must be used.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans <lmagrans@confluent.io>
Currently, the quorum uses kraft by default, so there's no need to
specify it explicitly.
For kip932 and isShareGroupTest, they are no longer used after #19542 .
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Migrates the `TxnTransitMetadata` class from scala to java, moving it
from to the `transaction-coordinator` module.
Reviewers: PoAn Yang <payang@apache.org>, Nick Guo
<lansg0504@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Admin.listConsumerGroups() was able to use the early versions of
ListGroups RPC with the version used dependent upon the filters the user
specified. Admin.listGroups(ListGroupsOptions.forConsumerGroups())
inadvertently required ListGroups v5 because it always set a types
filter. This patch handles the UnsupportedVersionException and winds
back the complexity of the request unless the user has specified filters
which demand a higher version.
It also adds ListGroupsOptions.forShareGroups() and forStreamsGroups().
The usability of Admin.listGroups() is much improved as a result.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>
This PR moves SchedulerTest to server module and rewrite it with java.
Please also check updated import control config!
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
While working on https://github.com/apache/kafka/pull/19515, I came to
the conclusion that the OffsetFetchResponse is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchResponse has a single constructor accepting the
OffsetFetchResponseData. A builder is introduced to handle the down
conversion. This will also simplify adding the topic ids. All the
changes are mechanical, replacing data structures by others.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
The PR adds `slice` API in `Records.java` and further implementation in
`MemoryRecords`. With the addition of ShareFetch and it's support to
read from TieredStorage, where ShareFetch might acquire subset of fetch
batches and TieredStorage emits MemoryRecords, hence a slice API is
needed for MemoryRecords as well to limit the bytes transferred (if
subset batches are acquired).
MemoryRecords are sliced using `duplicate` and `slice` API of
ByteBuffer, which are backed by the original buffer itself hence no-copy
is created rather position, limit and offset are changed as per the new
position and length.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent
share fetch / share acknowledge requests creates a new socket everytime,
even when the requests are sent by the same member. In reality, a single
share consumer clisnet will reuse the same socket for all the share
related requests in its lifetime. This PR changes the behaviour in the
tests to align with reality and reuse the same socket for all requests
by the same share group member.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
### About
Added code to handle share partition fetch lock cleanly in
`DelayedShareFetch` to avoid a member incorrectly releasing a share
partition's fetch lock
### Testing
The code has been tested with the help of unit tests and integration
tests.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Log a message when reading a batch that is larger than the currently
allocated batch.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, PoAn Yang
<payang@apache.org>
1. Move `ForwardingManagerMetrics` and `ForwardingManagerMetricsTest` to
server module.
2. Rewrite them in Java.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
rewrite `MetricsDuringTopicCreationDeletionTest` to `ClusterTest` infra
and move it to clients-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
The PR fixes the issue when ShareAcknowledgements are piggybacked on
ShareFetch. The current default configuration in clients sets `batch
size` and `max fetch records` as per the `max.poll.records` config,
default 500. Which means all records in a single poll will be fetched
and acknowledged. Also the default configuration for inflight records in
a partition is 200. Which means prior fetch records has to be
acknowledged prior fetching another batch from share partition.
The piggybacked share fetch-acknowledgement calls from KafkaApis are
async and later the response is combined. If respective share fetch
starts waiting in purgatory because all inflight records are currently
full, hence when startOffset is moved as part of acknowledgement, then a
trigger should happen which should try completing any pending share
fetch requests in purgatory. Else the share fetch requests wait in
purgatory for timeout though records are available, which dips the share
fetch performance.
The regular fetch has a single criteria to land requests in purgatory,
which is min bytes criteria, hence any produce in respective topic
partition triggers to check any pending fetch requests. But share fetch
can wait in purgatory because of multiple reasons: 1) Min bytes 2)
Inflight records exhaustion 3) Share partition fetch lock competition.
The trigger already happens for 1 and current PR fixes 2. We will
investigate further if there should be any handling required for 3.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
### About
11 of the test cases in `SharePartitionTest` have failed at least once
in the past 28 days.
https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
Observing the flakiness, they seem to be caused due to the usage of
`SystemTimer` for various acquisition lock timeout related tests. I have
replaced the usage of `SystemTimer` with `MockTimer` and also improved
the `MockTimer` API with regard to removing the timer task entries that
have already been cancelled.
Also, this has reduced the time taken to run `SharePartitionTest` from
~6 sec to ~1.5 sec
### Testing
The testing has been done with the help of already present unit tests in
Apache Kafka.
Reviewers: Andrew Schofield <aschofield@confluent.io>