Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR is based on the discussion here:
https://github.com/apache/kafka/pull/18929#discussion_r2083238645
Currently, the handle methods related to `shared‐group` requests are
inconsistent: some return `Unit` while others return
`CompletableFuture[Unit]` without a clear rationale. This PR
standardizes all of them to return `CompletableFuture[Unit]` and ensures
consistent error handling by chaining `.exceptionally(handleError)` to
each call site.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Migrate MinInSyncReplicasConfigTest to the server module
Reviewers: PoAn Yang <payang@apache.org>, Yung
<yungyung7654321@gmail.com>, TengYao Chi <frankvicky@apache.org>, Ken
Huang <s7133700@gmail.com>
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Recent changes to BrokerMetadataPublisher seem to have caused a mismatch
between the BrokerMetadataPublisher and its test.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>
### About
Minor improvements to Persister APIs in KafkaApis with respect to share
group protocol enablement
### Testing
The new code added has been tested with the help of unit tests
Reviewers: Sushant Mahajan <sushant.mahajan88@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:
```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```
This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Adding test to specifically force the fencing path due to delayed
rebalance, and validate how the consumer recovers automatically.
Running this test and DEBUG log enabled, allows to see the details of
the fencing flow: consumer getting fenced due to rebalance exceeded,
resetting to epoch 0, rejoining on the next poll with the existing
subscription, and being accepted back in the group (so consumption
resumes)
This is aimed to help understand
[KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233)
Will add another one in separate PR to also involve commits in similar
fencing scenarios.
Reviewers: TengYao Chi <frankvicky@apache.org>
According to the recent changes in KIP-932, when the share session cache
is full and a broker receives a Share Fetch request with Initial Share
Session Epoch (0), then the error code `SHARE_SESSION_LIMIT_REACHED` is
returned after a delay of maxWaitMs. This PR implements this logic. In
order to add a delay between subsequent share fetch requests, the timer
is delayed operation purgatory is used. A new `IdleShareFetchTimeTask`
has been added which takes in a CompletableFuture<Void>. Upon the
expiration, this future is completed with null. When the future is
completes, a response is sent back to the client with the error code
`SHARE_SESSION_LIMIT_REACHED`
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
* We have a few periodic timer tasks in `ShareCoordinatorService` which
run continuously.
* With the recent introduction of share group enabled config at feature
level, we would like these jobs to stop when the aforementioned feature
is disabled.
* In this PR, we have added the functionality to make that possible.
* Additionally the service has been supplemented with addition of a
static share group config supplier.
* New test has been added.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
The PR is a minor follow up on
https://github.com/apache/kafka/pull/19659.
KafkaApis.scala already have a check which denies new share fetch
related calls if the share group is not supported. Hence no new sessions
shall be created so the requirement to have share group enabled flag in
ShareSessionCache is not needed, unless I am missing something.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Do not print the `"Unexpected error .."` log when the config
`delete.topic.enable` is false
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
In KRaft mode, controllerListenerName must always be specified, so we don't need an `Optional` to wrap it.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
This PR introduces the following per-broker metrics:
-`kafka.controller:type=KafkaController,name=BrokerRegistrationState,broker=X`
-`kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=X`
and this metric:
`kafka.controller:type=KafkaController,name=ControlledShutdownBrokerCount`
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.
Co-authored-by: John Doe <zh2725284321@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <yungyung7654321@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>
Add a warning message when using log.cleaner.enable to remind users that
this configuration is deprecated. Also, add a warning message for
log.cleaner.threads=0 because in version 5.0, the value must be greater
than zero.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
We updated the validation rule for cleanup.policy in remote storage
mode.
If remote log storage is enabled, only cleanup.policy=delete is allowed.
Any other value (e.g. compact, compact,delete) will now result in a
config validation error.
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
The PR adds ShareGroupListener which triggers on group changes, such as
when member leaves or group is empty. Such events are specific to
connection on respective broker. There events help to clean specific
states managed for respective member or group in various caches.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Move LeaderEndPoint to Server module
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
In Java return types are covariant. This means that method override can
override the return type with a subclass.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai
<chia7712@apache.org>, Apoorv Mittal <apoorvmittal10@gmail.com>
This PR creates a listener for `SharePartitionManager` to listen to any
changes in `ShareVersion` feature. In case, there is a toggle, we need
to change the attributes in `SharePartitionManager` accordingly.
Reviewers: Andrew Schofield <aschofield@confluent.io>
* Currently even if a user topic is deleted, its related records are not
deleted with respect to subscribed share groups from the GC and the SC.
* The event of topic delete is propagated from the
BrokerMetadataPublisher to the coordinators via the
`onPartitionsDeleted` method. This PR leverages this method to issue
cleanup calls to the GC and SC respectively.
* To prevent chaining of futures in the GC, we issue async calls to both
GC and SC independently and the methods take care of the respective
cleanups unaware of the other.
* This method is more efficient and transcends issues related to
timeouts/broker restarts resulting in chained future execution issues.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch extends the OffsetFetch API to support topic ids. From
version 10 of the API, topic ids must be used.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans <lmagrans@confluent.io>
Currently, the quorum uses kraft by default, so there's no need to
specify it explicitly.
For kip932 and isShareGroupTest, they are no longer used after #19542 .
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Migrates the `TxnTransitMetadata` class from scala to java, moving it
from to the `transaction-coordinator` module.
Reviewers: PoAn Yang <payang@apache.org>, Nick Guo
<lansg0504@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Admin.listConsumerGroups() was able to use the early versions of
ListGroups RPC with the version used dependent upon the filters the user
specified. Admin.listGroups(ListGroupsOptions.forConsumerGroups())
inadvertently required ListGroups v5 because it always set a types
filter. This patch handles the UnsupportedVersionException and winds
back the complexity of the request unless the user has specified filters
which demand a higher version.
It also adds ListGroupsOptions.forShareGroups() and forStreamsGroups().
The usability of Admin.listGroups() is much improved as a result.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>
This PR moves SchedulerTest to server module and rewrite it with java.
Please also check updated import control config!
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
While working on https://github.com/apache/kafka/pull/19515, I came to
the conclusion that the OffsetFetchResponse is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchResponse has a single constructor accepting the
OffsetFetchResponseData. A builder is introduced to handle the down
conversion. This will also simplify adding the topic ids. All the
changes are mechanical, replacing data structures by others.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
The PR adds `slice` API in `Records.java` and further implementation in
`MemoryRecords`. With the addition of ShareFetch and it's support to
read from TieredStorage, where ShareFetch might acquire subset of fetch
batches and TieredStorage emits MemoryRecords, hence a slice API is
needed for MemoryRecords as well to limit the bytes transferred (if
subset batches are acquired).
MemoryRecords are sliced using `duplicate` and `slice` API of
ByteBuffer, which are backed by the original buffer itself hence no-copy
is created rather position, limit and offset are changed as per the new
position and length.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent
share fetch / share acknowledge requests creates a new socket everytime,
even when the requests are sent by the same member. In reality, a single
share consumer clisnet will reuse the same socket for all the share
related requests in its lifetime. This PR changes the behaviour in the
tests to align with reality and reuse the same socket for all requests
by the same share group member.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
### About
Added code to handle share partition fetch lock cleanly in
`DelayedShareFetch` to avoid a member incorrectly releasing a share
partition's fetch lock
### Testing
The code has been tested with the help of unit tests and integration
tests.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Log a message when reading a batch that is larger than the currently
allocated batch.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, PoAn Yang
<payang@apache.org>
1. Move `ForwardingManagerMetrics` and `ForwardingManagerMetricsTest` to
server module.
2. Rewrite them in Java.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
rewrite `MetricsDuringTopicCreationDeletionTest` to `ClusterTest` infra
and move it to clients-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
The PR fixes the issue when ShareAcknowledgements are piggybacked on
ShareFetch. The current default configuration in clients sets `batch
size` and `max fetch records` as per the `max.poll.records` config,
default 500. Which means all records in a single poll will be fetched
and acknowledged. Also the default configuration for inflight records in
a partition is 200. Which means prior fetch records has to be
acknowledged prior fetching another batch from share partition.
The piggybacked share fetch-acknowledgement calls from KafkaApis are
async and later the response is combined. If respective share fetch
starts waiting in purgatory because all inflight records are currently
full, hence when startOffset is moved as part of acknowledgement, then a
trigger should happen which should try completing any pending share
fetch requests in purgatory. Else the share fetch requests wait in
purgatory for timeout though records are available, which dips the share
fetch performance.
The regular fetch has a single criteria to land requests in purgatory,
which is min bytes criteria, hence any produce in respective topic
partition triggers to check any pending fetch requests. But share fetch
can wait in purgatory because of multiple reasons: 1) Min bytes 2)
Inflight records exhaustion 3) Share partition fetch lock competition.
The trigger already happens for 1 and current PR fixes 2. We will
investigate further if there should be any handling required for 3.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
### About
11 of the test cases in `SharePartitionTest` have failed at least once
in the past 28 days.
https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
Observing the flakiness, they seem to be caused due to the usage of
`SystemTimer` for various acquisition lock timeout related tests. I have
replaced the usage of `SystemTimer` with `MockTimer` and also improved
the `MockTimer` API with regard to removing the timer task entries that
have already been cancelled.
Also, this has reduced the time taken to run `SharePartitionTest` from
~6 sec to ~1.5 sec
### Testing
The testing has been done with the help of already present unit tests in
Apache Kafka.
Reviewers: Andrew Schofield <aschofield@confluent.io>
### About
This PR removes the limitation in remote storage fetch for share groups
of only performing remote fetch for a single topic partition in a share
fetch request. With this PR, share groups can now fetch multiple remote
storage topic partitions in a single share fetch request.
### Testing
I have followed the [AK
documentation](https://kafka.apache.org/documentation/#tiered_storage_config_ex)
to test my code locally (by adopting `LocalTieredStorage.java`) and
verify with the help of logs that remote storage is happening for
multiple topic partitions in a single share fetch request. Also,
verified it with the help of unit tests.
Reviewers: Jun Rao <junrao@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
The PR do following:
1. Move MetadataVersionIntegrationTest to clients-integration-tests
module
2. rewrite to java from scala
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In the return results of the methods beginningOffsets and endOffset, if
timeout == 0, then an empty Map should be returned uniformly instead of
in the form of <TopicPartition, null>
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet
Magrans <lmagrans@confluent.io>
1. remove org.apache.kafka.raft.OffsetAndEpoch
2. rewrite org.apache.kafka.server.common.OffsetAndEpoch by record
keyword
3. rename OffsetAndEpoch#leaderEpoch to OffsetAndEpoch#epoch
Reviewers: PoAn Yang <payang@apache.org>, Xuan-Zhang Gong
<gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Propose adding a new filter TransactionalIdPattern. This transaction ID pattern filter works as AND with the other transaction filters. Also, it is empowered with Re2j.
KIP: https://cwiki.apache.org/confluence/x/4gm9F
Reviewers: Justine Olshan <jolshan@confluent.io>, Ken Huang
<s7133700@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
For records which are automatically released as a result of closing a
share session normally, the delivery count should not be incremented.
These records were fetched but they were not actually delivered to the
client since the disposition of the delivery records is carried in the
ShareAcknowledge which closes the share session. Any remaining records
were not delivered, only fetched.
This PR releases the delivery count for records when closing a share
session normally.
Co-authored-by: d00791190 <dinglan6@huawei.com>
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Up till now, the share sessions in the broker were only attempted to
evict when the share session cache was full and a new session was trying
to get registered. With the changes in this PR, whenever a share
consumer gets disconnected from the broker, the corresponding share
session would be evicted from the cache.
Note - `connectAndReceiveWithoutClosingSocket` has been introduced in
`GroupCoordinatorBaseRequestTest`. This method creates a socket
connection, sends the request, receives a response but does not close
the connection. Instead, these sockets are stored in a ListBuffer
`openSockets`, which are closed in tearDown method after each test is
run. Also, all the `connectAndReceive` calls in
`ShareFetchAcknowledgeRequestTest` have been replaced by
`connectAndReceiveWithoutClosingSocket`, because these tests depends
upon the persistence of the share sessions on the broker once
registered. But, with the new code introduced, as soon as the socket
connection is closed, a connection drop is assumed by the broker,
leading to session eviction.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This PR uses the v1 of the ShareVersion feature to enable share groups
for KIP-932.
Previously, there were two potential configs which could be used -
`group.share.enable=true` and including "share" in
`group.coordinator.rebalance.protocols`. After this PR, the first of
these is retained, but the second is not. Instead, the preferred switch
is the ShareVersion feature.
The `group.share.enable` config is temporarily retained for testing and
situations in which it is inconvenient to set the feature, but it should
really not be necessary, especially when we get to AK 4.2. The aim is to
remove this internal config at that point.
No tests should be setting `group.share.enable` any more, because they
can use the feature (which is enabled in test environments by default
because that's how features work). For tests which need to disable share
groups, they now set the share feature to v0. The majority of the code
changes were related to correct initialisation of the metadata cache in
tests now that a feature is used.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Add new StreamsGroupFeature, disabled by default, and add "streams" as
default value to `group.coordinator.rebalance.protocols`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<david.jacot@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>,
Justine Olshan <jolshan@confluent.io>, Andrew Schofield
<aschofield@confluent.io>, Jun Rao <jun@confluent.io>
- Add support topicId in `ProduceRequest`/`ProduceResponse`. Topic name
and Topic Id will become `ignorable` following the footstep of
`FetchRequest`/`FetchResponse`
- ReplicaManager still look for `HostedPartition` using `TopicPartition`
and doesn't check topic id. This is an **[OPEN QUESTION]** if we should
address this in this pr or wait for
[KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) as this
will update `ReplicaManager::getPartition` to use `TopicIdParittion`
once we update the cache. Other option is that we compare provided
`topicId` with `Partition` topic id and return `UNKNOW_TOPIC_ID` or
`UNKNOW_TOPIC_PARTITION` if we can't find partition with matched topic
id.
Reviewers: Jun Rao <jun@confluent.io>, Justine Olshan
<jolshan@confluent.io>
While working on https://github.com/apache/kafka/pull/19515, I came to
the conclusion that the OffsetFetchRequest is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchRequest has a single constructor accepting the
OffsetFetchRequestData. This will also simplify adding the topic ids.
All the changes are mechanical, replacing data structures by others.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Lianet Magran <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This is a follow up PR for implementation of DeleteShareGroupOffsets
RPC. This PR adds the ShareGroupStatePartitionMetadata record to
__consumer__offsets topic to make sure the topic is removed from the
initializedTopics list. This PR also removes partitions from the request
and response schemas for DeleteShareGroupState RPC
Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Use Java to rewrite `PlaintextConsumerFetchTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Old bootstrap.metadata files cause problems with server that include
KAFKA-18601. When the server tries to read the bootstrap.checkpoint
file, it will fail if the metadata.version is older than 3.3-IV3
(feature level 7). This causes problems when these clusters are
upgraded.
This PR makes it possible to represent older MVs in BootstrapMetadata
objects without causing an exception. An exception is thrown only if we
attempt to access the BootstrapMetadata. This ensures that only the code
path in which we start with an empty metadata log checks that the
metadata version is 7 or newer.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Ismael Juma
<ismael@juma.me.uk>, PoAn Yang <payang@apache.org>, Liu Zeyu
<zeyu.luke@gmail.com>, Alyssa Huang <ahuang@confluent.io>
This PR marks the records as non-nullable for ShareFetch.
This PR is as per the changes for Fetch:
https://github.com/apache/kafka/pull/18726 and some work for ShareFetch
was done here: https://github.com/apache/kafka/pull/19167. I tested with
marking `records` as non-nullable in ShareFetch, which required
additional handling. The same has been fixed in current PR.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>, PoAn Yang
<payang@apache.org>
The generated response data classes take Readable as input to parse the
Response. However, the associated response objects take ByteBuffer as
input and thus convert them to Readable using `new ByteBufferAccessor`
call.
This PR changes the parse method of all the response classes to take the
Readable interface instead so that no such conversion is needed.
To support parsing the ApiVersionsResponse twice for different version
this change adds the "slice" method to the Readable interface.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Truc Nguyen
<[trnguyen@confluent.io](mailto:trnguyen@confluent.io)>, Aadithya
Chandra <[aadithya.c@gmail.com](mailto:aadithya.c@gmail.com)>
Change the log messages which used to warn that KIP-932 was an Early
Access feature to say that it is now a Preview feature. This will make
the broker logs far less noisy when share groups are enabled.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This PR removes the group.share.max.groups config. This config was used
to calculate the maximum size of share session cache. But with the new
config group.share.max.share.sessions in place with exactly this
purpose, the ShareSessionCache initialization has also been passed the
new config.
Refer: [KAFKA-19156](https://issues.apache.org/jira/browse/KAFKA-19156)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch extends the OffsetCommit API to support topic ids. From
version 10 of the API, topic ids must be used. Originally, we wanted to
support both using topic ids and topic names from version 10 but it
turns out that it makes everything more complicated. Hence we propose to
only support topic ids from version 10. Clients which only support using
topic names can either lookup the topic ids using the Metadata API or
stay on using an earlier version.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, PoAn Yang <payang@apache.org>
Currently the share session cache is desgined like the fetch session
cache. If the cache is full and a new share session is trying to get get
initialized, then the sessions which haven't been touched for more than
2minutes are evicted. This wouldn't be right for share sessions as the
members also hold locks on the acquired records, and session eviction
would mean theose locks will need to be dropped and the corresponding
records re-delivered. This PR removes the time based eviction logic for
share sessions.
Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR removes the unstable API flag for the KIP-932 RPCs.
The 4 RPCs which were exposed for the early access release in AK 4.0 are
stabilised at v1. This is because the RPCs have evolved over time and AK
4.0 clients are not compatible with AK 4.1 brokers. By stabilising at
v1, the API version checks prevent incompatible communication and
server-side exceptions when trying to parse the requests from the older
clients.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This PR adds the support for remote storage fetch for share groups.
There is a limitation in remote storage fetch for consumer groups that
we can only perform remote fetch for a single topic partition in a fetch
request. Since, the logic of share fetch requests is largely based on
how consumer groups work, we are following similar logic in implementing
remote storage fetch. However, this problem should be addressed as
part of KAFKA-19133 which should help us perform fetch for multiple
remote fetch topic partition in a single share fetch request.
Reviewers: Jun Rao <junrao@gmail.com>
The check for `scheduler.pendingTaskSize()` may fail if the thread pool
is too slow to consume the runnable objects
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
1. Remove `RemoteLogManager#startup` and
`RemoteLogManager#onEndpointCreated`
2. Move endpoint creation to `BrokerServer`
3. Move `RemoteLogMetadataManager#configure` and
`RemoteLogStorageManager#configure` to RemoteLogManager constructor
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
For segments that are uploaded to remote, RemoteIndexCache caches the
fetched offset, timestamp, and transaction index entries on the first
invocation to remote, then the subsequent invocations are accessed from
local.
The remote indexes that are cached locally gets removed on two cases:
1. Remote segments that are deleted due to breach by retention size/time
and start-offset.
2. The number of cached indexes exceed the remote-log-index-cache size
limit of 1 GB (default).
There are two layers of locks used in the RemoteIndexCache. First-layer
lock on the RemoteIndexCache and the second-layer lock on the
RemoteIndexCache#Entry.
**Issue**
1. The first-layer of lock coordinates the remote-log reader and deleter
threads. To ensure that the reader and deleter threads are not blocked
on each other, we only take `lock.readLock()` when accessing/deleting
the cached index entries.
2. The issue happens when both the reader and deleter threads took the
readLock, then the deleter thread marked the index as
`markedForCleanup`. Now, the reader thread which holds the `indexEntry`
gets an IllegalStateException when accessing it.
3. This is a concurrency issue, where we mark the entry as
`markedForCleanup` before removing it from the cache. See
RemoteIndexCache#remove, and RemoteIndexCache#removeAll methods.
4. When an entry gets evicted from cache due to breach by maxSize of 1
GB, then the cache remove that entry before calling the evictionListener
and all the operations are performed atomically by caffeine cache.
**Solution**
1. When the deleter thread marks an Entry for deletion, then we rename
the underlying index files with ".deleted" as suffix and add a job to
the remote-log-index-cleaner thread which perform the actual cleanup.
Previously, the indexes were not accessible once it was marked for
deletion. Now, we allow to access those renamed files (from entry that
is about to be removed and held by reader thread) until those relevant
files are removed from disk.
2. Similar to local-log index/segment deletion, once the files gets
renamed with ".deleted" as suffix then the actual deletion of file
happens after `file.delete.delay.ms` delay of 1 minute. The renamed
index files gets deleted after 30 seconds.
3. During this time, if the same index entry gets fetched again from
remote, then it does not have conflict with the deleted entry as the
file names are different.
Reviewers: Satish Duggana <satishd@apache.org>
The final part of KIP-1043 is to deprecate Admin.listConsumerGroups() in
favour of Admin.listGroups() which works for all group types.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Separates metadata-related configurations from the `KRaftConfigs` into
the `MetadataLogConfig` class.
Previously, metadata-related configs were placed in `KRaftConfigs`,
which mixed server-related configs (like process.roles) with
metadata-specific ones (like metadata.log.*), leading to confusion and
tight coupling.
In this PR:
- Extract metadata-related config definitions and variables from
`KRaftConfig` into `MetadataLogConfig`.
- Move `node.id` out of `MetadataLogConfig` into `KafkaMetadataLog’s
constructor` to avoid redundant config references.
- Leave server-related configurations in `KRaftConfig`, consistent with
its role.
This separation makes `KafkaConfig` and `KRaftConfig` cleaner, and
aligns with the goal of having a dedicated MetadataLogConfig class for
managing metadata-specific configurations.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Pretty much a straight forward move of these classes. I just updated
`RemoteLogManagerTest` to not use `KafkaConfig`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
the following tasks should be addressed in this ticket rewrite it by
1. new test infra
2. use java
3. move it to clients-integration-test
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This is a follow-up clean of #19387 Since we no longer access the log
cleaner config from `KafkaConfig`, we should remove these unused fields.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This patch updates the `GroupCoordinator` interface to use
`AuthorizableRequestContext` instead of using `RequestContext`. It makes
the interface more generic. The only downside is that the request
version in `AuthorizableRequestContext` is an `int` instead of a `short`
so we had to adapt it in a few places. We opted for using `int` directly
wherever possible.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
The `DelayedElectLeader` is only used in `TestReplicaManager`, but there
is no reference in it, so we can safely remove it.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813) added
`Topic:Describe` authorization of topics matching regex patterns to the
group coordinator since it was difficult to authorize these in the
broker when processing consumer heartbeats using the new protocol. But
group coordinator is started in `BrokerServer` before the authorizer is
created. And hence group coordinator doesn't have an authorizer and
never performs authorization. As a result, topics that are not
authorized for `Describe` may be assigned to consumers. This potentially
leaks information about topic existence, topic id and partition count to
users who are not authorized to describe a topic. This PR starts
authorizer earlier to ensure that authorization is performed by the
group coordinator. Also adds integration tests for verification.
Note that we still have a second issue when members have different
permissions. If regex is resolved by a member with permission to more
topics, unauthorized topics may be assigned to members with lower
permissions. In this case, we still return assignment containing topic
id and partitions to the member without `Topic:Describe` access. This is
not addressed by this PR, but an integration test that illustrates the
issue has been added so that we can verify when the issue is fixed.
Reviewers: David Jacot <david.jacot@gmail.com>
Use Java to rewrite `PlaintextConsumerCallbackTest` by new test infra
and move it to client-integration-tests module.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
JIRA: KAFKA-13610 This patch deprecates the `log.cleaner.enable`
configuration. It's part of
[KIP-1148](https://cwiki.apache.org/confluence/x/XAyWF).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>
This also adds metrics to StandardAuthorizer
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
<tjwu1217@gmail.com>
This patch adds ACL support for 2PC as a part of KIP-939
A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT
((byte) 15 . When InitProducerId comes with enable2Pc=true, it would
have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the
transactional id resource.
The kafka-acls.sh tool is going to support a new --operation
TwoPhaseCommit.
Reviewers: Artem Livshits <alivshits@confluent.io>, PoAn Yang
<poan.yang@suse.com>, Justine Olshan <jolshan@confluent.io>
include three test case
- ProducerCompressionTest
- ProducerFailureHandlingTest
- ProducerIdExpirationTest
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
1. Remove unused parameter from KafkaMetadataLog.
2. Give Utils.closeQuietly a meaningful name when closing reader.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
---------
Co-authored-by: TengYao Chi <kitingiao@gmail.com>
Both AddPartitionsToTxnConfig and TransactionStateManagerConfig are
static configs and they don't have specific config check. We can move
them out of KafkaConfig to simplify KafkaConfig.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1. Migrate DelegationTokenManager to server module.
2. Rewrite DelegationTokenManager in Java.
3. Move DelegationTokenManagerConfigs out of KafkaConfig.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
As of 3.9, Kafka allows disabling remote storage on a topic after it was
enabled. It allows subsequent enabling and disabling too.
However the documentation says otherwise and needs to be corrected.
Doc:
https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable
Reviewers: Luke Chen <showuon@gmail.com>, PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>
As described in the JIRA ticket, `controlPlaneRequestChannelOpt` was
removed from KRaft mode, so there's no need to use the metrics prefix
anymore.
This change removes `metricNamePrefix` from RequestChannel and the
related files.
It also removes `DataPlaneAcceptor#MetricPrefix`, since
`DataPlaneAcceptor` is the only implementation of `Acceptor`.
Since the implementation of KIP-291 is essentially removed, we can also
remove `logAndThreadNamePrefix` and `DataPlaneAcceptor#ThreadPrefix`.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Move LogCleaner and related classes to storage module and rewrite in
Java.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Jun Rao <junrao@gmail.com>
This PR proposes a switch to enable share groups for 4.1 (preview) and
4.2 (GA).
* `share.version=1` to indicate that share groups are enabled. This is
used as the switch for turning share groups on and off.
In 4.1, the default will be `share.version=0`. Then a user wanting to
evaluate the preview of KIP-932 would use `bin/kafka-features.sh
--bootstrap.server xxxx upgrade --feature share.version=1`.
In 4.2, the default will be `share.version=1`.
Reviewers: Jun Rao <junrao@gmail.com>
Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test
infra and move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
It seems `timeMs` this parameter never used in Kafka project, the method
init commit is
b5f90daf13
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
* We wish to track the time of creation of the `ShareSnapshot` records
so that automated jobs could force their creation if a share partition
has gone cold (no updates for a specified time interval).
* To accomplish this, we have added 2 new fields `CreateTimestamp` and
`WriteTimestamp` in the `ShareSnapshot` record.
* The former tracks snapshot creation due to regular RPC calls while the
latter will track snapshots created by periodic jobs.
* In this PR we have made the requisite changes.
* This is a first of a series of PRs to create the automated jobs and
associated scaffolding.
Reviewers: Andrew Schofield <aschofield@confluent.io>
JIRA: KAFKA-18935 This patch ensures the broker will not return null
records in FetchResponse. For more details, please refer to the
ticket.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai
<chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
Add a note to the group protocol configuration that streams groups are
in early access and should not be used in production.
Also update an outdated comment related to disabling the protocol.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This PR adds the share group dynamic config `share.isolation.level`.
Until now, share groups only supported `READ_UNCOMMITTED` isolation
level type. With this PR, we aim to support `READ_COMMITTED` isolation
type to share groups.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This PR contains the implementation of KafkaAdminClient and
GroupCoordinator for DeleteShareGroupOffsets RPC.
- Added `deleteShareGroupOffsets` to `KafkaAdminClient`
- Added implementation for `handleDeleteShareGroupOffsetsRequest` in
`KafkaApis.scala`
- Added `deleteShareGroupOffsets` to `GroupCoordinator` as well.
internally this makes use of `persister.deleteState` to persist the
changes in share coordinator
Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
> INFO [data-plane Kafka Request Handler on Broker 3000], Resizing
request handler thread pool size from 8 to 10
(kafka.server.KafkaRequestHandlerPool)
it should be "Controller" rather than "Broker"
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Use the ProcessRole enum instead of hardcoding the role
Reviewers: Mickael Maison <mickael.maison@gmail.com>, PoAn Yang
<poan.yang@suse.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>
The `lastOffset` is not used actually, so it can be removed.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch renames the KIP-405 Plugin variable from
`remoteLogStorageManager` to `remoteStorageManager`. After [writing
about
it](https://aiven.io/blog/apache-kafka-tiered-storage-in-depth-how-writes-and-metadata-flow),
I realized I got swayed by the code and called the component incorrectly
- the official name doesn't have `Log` in it. I thought i'd go ahead and
change the code so it's consistent with the naming too
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only the Java files
in the Core module. Scala related changes may follow next.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
- Some changes to use enhanced switch statement.
Reviewers: Andrew Schofield <aschofield@confluent.io>, PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>
## Summary
This PR updates the `RecordVersion` javadoc for clarity. It removes
outdated references to `message.format.version` mentioned in the [Kafka
4.0 upgrade
documentation](48f06981ee/40/upgrade.html (L135))
and aligns with feedback from a previous discussion in [#19325
](https://github.com/apache/kafka/pull/19325).
## Changes
- Cleaned up javadoc in `RecordVersion`
- Removed outdated or deprecated references
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch moves `BrokerReconfigurable` to the `server-common module`
and decouples the `TransactionLogConfig` and `KafkaConfig` to unblock
KAFKA-14485.
Reviewers: PoAn Yang <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
`Processor#accept` accepts a metric which tracks the amount of time for
which the Acceptor thread was blocked. It's misleading to name it
`acceptorIdlePercentMeter` and this change updates its naming to align
with the call site.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Use Java to rewrite `TransactionsExpirationTest` by new test infra and
move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The test `testMultipleConcurrentShareFetches` is throwing a silent
exception.
`ERROR Error processing delayed share fetch request
(kafka.server.share.DelayedShareFetch:225)`
This is due to incomplete mocks setup for the test and also requires
changes in timeout.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This PR approaches completion of Admin.listShareGroupOffsets() and
kafka-share-groups.sh --describe --offsets.
Prior to this patch, kafka-share-groups.sh was only able to describe the
offsets for partitions which were assigned to active members. Now, the
Admin.listShareGroupOffsets() uses the persister's knowledge of the
share-partitions which have initialised state. Then, it uses this list
to obtain a complete set of offset information.
The PR also implements the topic-based authorisation checking. If
Admin.listShareGroupOffsets() is called with a list of topic-partitions
specified, the authz checking is performed on the supplied list,
returning errors for any topics to which the client is not authorised.
If Admin.listShareGroupOffsets() is called without a list of
topic-partitions specified, the list of topics is discovered from the
persister as described above, and then the response is filtered down to
only show the topics to which the client is authorised. This is
consistent with other similar RPCs in the Kafka protocol, such as
OffsetFetch.
Reviewers: David Arthur <mumrah@gmail.com>, Sushant Mahajan <smahajan@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Added `trace` logs to help debug `nextFetchOffset` functionality within
SharePartition. We did not have a way to figure out the fetch offsets of
a share partition through logs. Forward moving `fetchOffset` confirms
that the consumption from a given share partition is happening correctly
on the broker.
Reviewers: Andrew Schofield <aschofield@confluent.io>
There are two change for this PR.
1. Move `BrokerCompressionTest ` from core to storage
2. Rewrite `BrokerCompressionTest ` from scala to java
Reviewers: TengYao Chi <kitingiao@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
With the new Streams rebalance protocol, the Streams client sends the
topology with the used topics to the broker for initialization. For the
initialization the broker needs to describe the topics in the topology
and consequently the Streams application needs to be authorized to
describe the topics.
The broker checks the authorization by filtering the topics in the
topology by authorization. This filtering implicitly deduplicates the
topics of the topology if they appear multiple times in the topology
send to the brokers. After that the broker compares the size of the
authorized topics with the topics in the topology. If the authorized
topics are less than the topics in the topology a
TOPIC_AUTHORIZATION_FAILED error is returned.
In Streams a topology that is sent to the brokers likely has duplicate
topics because a repartition topic appears as a sink for one subtopology
and as a source for another subtopology.
This commit deduplicates the topics of a topology before the
verification of the authorization.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The `PlaintextAdminIntegrationTest#testDeleteRecordsAfterCorruptRecords`
was only enabled for classic protocol. Add consumer protocol to it.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The `metadataVersionSupplier` is unused after this - remove it.
Also remove redundant `metadataVersion.fetchRequestVersion >= 13` check
in `RemoteLeaderEndPoint` - the minimum version returned by this method
is `13`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Minor PR to correct the check for the presence of acquisition lock in
`assertionFailedMessage` method in `SharePartitionTest`
Reviewers: Andrew Schofield <aschofield@confluent.io>
PR add `MaxRecords` to share fetch request and also adds
`AcquisitionLockTimeout` to share fetch response. PR also removes
internal broker config of `max.fetch.records`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This is a small follow-up of https://github.com/apache/kafka/pull/19290.
The `actionQueue` argument is only used by the
`CoordinatorPartitionWriter` so we can remove it from the other methods
now.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
This patch moves `ConsumerTopicCreationTest` to the
`client-integration-tests` and rewrite it as Java.
The patch also streamlines the test flow.
In the Scala version, there is a producer that produces messages, but
this is not the main purpose of the `ConsumerTopicCreationTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The zookeeper mode was removed in 4.0. The test cases don't need to
specify quorum. Following variable and functions can be replaced:
- TestWithParameterizedQuorumAndGroupProtocolNames
- getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersAll
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch addresses a weirdness on the GroupCoordinator write path. The
`CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords`
method with `acks=1` and it expects it to completes
immediately/synchronously. It works because this is effectively what the
method does with `acks=1`. The issue is that fundamentally the method is
asynchronous so the contract is really fragile. This patch changes it by
introducing new method `ReplicaManager.appendRecordsToLeader`, which is
synchronous. It also refactors `ReplicaManager#appendRecords` to use
`ReplicaManager.appendRecordsToLeader` so we can benefits from all the
existing tests.
Reviewers: Fred Zheng <fzheng@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This patch applies a few cleanups to uniformize how group coordinator's
integration tests are setup.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Move LogCleanerManager and related classes to storage module and rewrite
in Java.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The generated request data type's constructors take Readable as an input. However, the parse method in the
AbstractRequest takes a ByteBuffer as input. So to create the corresponding request data objects, each individual
concrete Request classes wraps the ByteBuffer into a ByteBufferAccessor.
This is boilerplate code present in all the concrete request classes. This changes AbstractRequest's parse method so that subclasses can simply pass the `Readable` they get directly to request data classes.
The same change is made to the serialize method to maintain symmetry.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio
<jsancio@apache.org>, Artem Livshits <alivshits@confluent.io>,
Truc Nguyen <trnguyen@confluent.io>
Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.
The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.
Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR fixes a potential issue where the `FetchResponse` returns
`divergingEndOffsets` with an older leader epoch. This can lead to
committed records being removed from the follower's log, potentially
causing data loss.
In detail:
`processFetchRequest` gets the requested leader epoch of partition data
by `topicPartition` and compares it with the leader epoch of the current
fetch state. If they don't match, the response is ignored.
Reviewers: Jun Rao <junrao@gmail.com>
This patch fixes another bug in the FindCoordinator API handling for
share partition key. `shareCoordinator.foreach` returns `Unit` so
`shareCoordinator.foreach(coordinator =>
coordinator.partitionFor(SharePartitionKey.getInstance(key)))` does not
return the partition for the key.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
JIRA: KAFKA-18576
After removing ZooKeeper, we no longer need to exclude `client_metrics`
and `group` from `ConfigType#ALL`.
Since it's a common pattern to provide a mechanism to know all values in
enumeration ( Java enum provides ootb), we should convert ConfigType to
enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
At the moment, the FindCoordinator API returns an `UNKNOWN_SERVER_ERROR`
error when the share partition key is invalid. It seems that the aim was
to return an `INVALID_REQUEST` error but the code has a small bug
preventing it from working as expected.
Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This PR adds extra information in assertion failed messages for tests in
SharePartitionTest revolving around acquisition lock timeouts
functionality.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The PR changed three things.
* Migrated `ProducerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
Default `ProducerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the producer with enabled rebootstrap
<img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60"
/>
The test case for the producer with disabled rebootstrap
<img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728"
/>
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch is the third of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
It removes the old group coordinator and cleans up the
`GroupCoordinator` interface.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
ReplicaSelector implementations can implement Monitorable to register their own metrics.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang <s7133700@gmail.com>
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Rewrite the class in Java and move it to the raft module.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Reverts commit
2723dbf3a0
and
269e8892ad.
Instead of reopening the transaction index, it cancels the
RemoteFetchTask without interrupting it--avoiding to close the
TransactionIndex channel.
This will lead to complete the execution of the remote fetch but ignoring
the results. Given that this is considered a rare case, we could live
with this. If it becomes a performance issue, it could be optimized.
Reviewers: Jun Rao <junrao@gmail.com>
This patch is to move `DynamicProducerStateManagerConfig` and `BrokerReconfigurable` to the server module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This patch filters out the topic describe unauthorized topics from the
StreamsGroupHeartbeat and StreamsGroupDescribe response.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This patch adds logic to enable and handle two phase commit (2PC)
transactions following KIP-939.
The changes made are as follows:
1) Add a new broker config called
**transaction.two.phase.commit.enable** which is set to false by default
2) Add new flags **enableTwoPCFlag** and **keepPreparedTxn** to
handleInitProducerId
3) Return an error if keepPreparedTxn is set to true (for now)
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
<jolshan@confluent.io>
During broker startup, attempt to read dynamic configurations from latest local snapshot on disk. This will avoid most situations where the static configuration is not sufficient to start up, but the dynamic configuration would have been. The PR includes an integration test.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Move share consumer to clients-integration-tests module and use `@BeforeEach` to setup
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
see discussion in
[KAFKA-18735](https://issues.apache.org/jira/browse/KAFKA-18735) - the
test should include following check.
1. Using name=<default> does not create default quota
2. the returned entity should have name=<default>
2. the filter `ClientQuotaFilterComponent.ofDefaultEntity` should return
nothing
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Given that the `core` module will be separated into other small modules,
this test will not be added to the core module.
Instead, I added it to the `clients-integration-tests` module since it
focuses on the admin client test. The patch should include following test cases.
1. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, but `describeConfigs`
does not.
2. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, and `describeConfigs`
does if admin is using controller.bootstrap
3. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, but `describeConfigs` does.
4. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, and `describeConfigs` does not
also if admin is using controller.bootstrap
for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of "controller.properties" BTW, those test cases should leverage new test infra, since new test infra allow us to define configs to broker/controller individually.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Avoid redundant MetricName creation in BaseQuotaTest#produceUntilThrottled via moving metrics creation out of loop.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This PR brings client metrics configuration resources in line with the
other config resources in terms of handling synonyms and defaults.
Specifically, configs which are not explicitly set take their hard-coded
default values, and these are reported by `kafka-configs.sh --describe`
and `Kafka-client-metrics.sh --describe`. Previously, they were omitted
which means the administrator needed to know the default values.
The ConfigHelper was changed so that the handling of client metrics
configuration matches that of group configuration.
Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This change implements the broker-side configs proposed in KIP-1071.
The configurations implemented by this PR are only those that were specifically aimed to be included in `AK 4.1`.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This PR aims to remove the usage of partition max bytes from share fetch
requests. Partition Max Bytes is being defined by
`PartitionMaxBytesStrategy` which was added to the broker as part of PR
https://github.com/apache/kafka/pull/17870
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
The PR fixes the behaviour when records are fetched which are larger
than `fetch.max.bytes` config.
The usage of `hardMaxBytesLimit` is in ReplicaManager where it decides
whether to fetch a single record or not. The file records get sliced
based on the bytes requested. However, if `hardMaxBytesLimit` is false
then at least one record is fetched and bytes are adjusted accordingly in
`localLog`.
Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.
A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.
Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
* Add `DynamicThreadPool.java` to the server module.
* Remove the old DynamicThreadPool object in the `DynamicBrokerConfig.scala`.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The kafka controllers need to set kraft.version in their
ApiVersionsResponse messages according to the current kraft.version
reported by the Raft layer. Instead, currently they always set it to 0.
Also remove FeatureControlManager.latestFinalizedFeatures. It is not
needed and it does a lot of copying.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When producers send future timestamps, time retention based log segments
may get blocked from removal for an extended period of time. Log
cleaning should should warn in the logs when this scenario occurs.
Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Migrate AdminClientRebootstrapTest to the new test infra and remove the
old Scala test.
Reviewers: TengYao Chi <kitingiao@gmail.com>, David Arthur <mumrah@gmail.com>
User testing of the `KafkaShareConsumer` interface has revealed some
areas which confuse people. One of these is that way that it decides
whether you want to use implicit or explicit acknowledgement of records
by observing which calls the application issues. We are taking the
opportunity to refine the interface before it is finalised.
This PR introduces an experimental configuration called
`internal.share.acknowledgement.mode` which can be used to make the
application declare which kind of acknowledgement it wishes to use. We
plan to try out the configuration, assess whether it has helped, and
then create a proper consumer configuration that makes this area better.
That would require a lot of change in the tests, which explains why this
initial PR only has a small number of tests.
Reviewers: David Arthur <mumrah@gmail.com>
1. Updated JavaDoc to reflect that CreateTopicPolicy and AlterConfigPolicy run on the controller in KRaft mode.
2. Modified Behavioral Change Reference in the HTML docs to include this change.
3. add warning message to KafkaConfig if the config of broker node has policy configs
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The PR corrects the way the locks are released on exception. As
`partitionsAcquired` can be a reference to `topicPartitionData`, hence
the locks should released prior clearing `partitionsAcquired`.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
When regular expressions are resolved, they do not update the group by
topics data structure. Hence, topic changes (e.g. deletion) do not
trigger a rebalance of the group.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This PR reduces `maxInterval` for `resendExponentialBackoff` in
`BrokerLifecycleManager` class from `broker.session.timeout.ms` to half
of its value. Setting `maxInterval` to `broker.session.timeout.ms`
caused brokers to be fenced if a resend attempt occurred near the
timeout threshold, leading to unnecessary broker fencing.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Implements auto-topic creation when handling the streams group
heartbeat.
Inside KafkaApis, the handler for streamsGroupHeartbeat uses the result
of the streams group heartbeat inside the group coordinator to attempt
to create all missing internal topics using AutoTopicCreationManager.
CREATE TOPIC ACLs are checked. The unit tests class
AutoTopicCreationManagerTest is brought back (it was recently deleted
during a ZK removal PR), but testing only the kraft-related
functionality.
Reviewers: Bruno Cadonna <bruno@confluent.io>
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Remove kafka.cluster.Broker and BrokerEndPointNotAvailableException as they were used by zk path.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The PR implements the SharePartitionMetrics as defined in KIP-1103, with
one change. The metric `FetchLockRatio` is defined as `Meter` in KIP but
is implemented as `HIstogram`. There was a discussion about same on
KIP-1103 discussion where we thought that `FetchLockRatio` is
pre-aggregated but while implemeting the rate from `Meter` can go above
100 as `Meter` defines rate per time period. Hence it makes more sense
to implement metric `FetchLockRatio` as `Histogram`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
In ConsumerGroupHeartbeat,
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.
In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)
Reviewers: David Jacot <djacot@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
TengYao Chi <kitingiao@gmail.com>
This change implements the basic RPC handling StreamsGroupHeartbeat and
StreamsGroupDescribe. This includes:
- Adding an option to enable streams groups on the broker
- Passing describe and heartbeats to the right shard of the group
coordinator
- The handler inside the GroupMetadatManager for StreamsGroupDescribe is
fairly trivial, and is included directly in this PR.
- The handler for StreamsGroupHeartbeat is complex and not included in
this PR yet. Instead, a UnsupportedOperationException is thrown.
However, the interface is already defined: The result of a
streamsGroupHeartbeat is a response, together with a list of internal
topics to be created.
The heartbeat implementation inside the `GroupMetadataManager`, which
actually implements the assignment / reconciliation logic, will come in
a follow-up PR. Also, automatic creation of internal topics will be
created in a follow-up PR.
Reviewers: Bill Bejeck <bill@confluent.io>
### About
The current `SimpleAssignor` in AK assigned all subscribed topic
partitions to all the share group members. This does not match the
description given in
[KIP-932](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-TheSimpleAssignor).
Here are the rules as mentioned in the KIP by which the assignment
should happen. We have changed the step 3 implementation here due to the
reasons
[described](https://github.com/apache/kafka/pull/18864#issuecomment-2659266502)
-
1. The assignor hashes the member IDs of the members and maps the
partitions assigned to the members based on the hash. This gives
approximately even balance.
2. If any partitions were not assigned any members by (1) and do not
have members already assigned in the current assignment, members are
assigned round-robin until each partition has at least one member
assigned to it.
3. We combine the current and new assignment. (Original rule - If any
partitions were assigned members by (1) and also have members in the
current assignment assigned by (2), the members assigned by (2) are
removed.)
### Tests
The added code has been verified with unit tests and the already present
integration tests.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, TaiJuWu <tjwu1217@gmail.com>
For the KRaft implementation there is a race between the network thread,
which read bytes in the log segments, and the KRaft driver thread, which
truncates the log and appends records to the log. This race can cause
the network thread to send corrupted records or inconsistent records.
The corrupted records case is handle by catching and logging the
CorruptRecordException. The inconsistent records case is handle by only
appending record batches who's partition leader epoch is less than or
equal to the fetching replica's epoch and the epoch didn't change
between the request and response.
For the ISR implementation there is also a race between the network
thread and the replica fetcher thread, which truncates the log and
appends records to the log. This race can cause the network thread send
corrupted records or inconsistent records. The replica fetcher thread
already handles the corrupted record case. The inconsistent records case
is handle by only appending record batches who's partition leader epoch
is less than or equal to the leader epoch in the FETCH request.
Reviewers: Jun Rao <junrao@apache.org>, Alyssa Huang <ahuang@confluent.io>, Chia-Ping Tsai <chia7712@apache.org>
The PR handles fetch for `compacted` topics. The fix was required only
when complete batch disappears from the topic log, and same batch is
marked re-available in Share Partition state cache. Subsequent log reads
will not result the disappeared batch in read response hence respective
batch will be left as available in the state cache.
The PR checks for the first fetched/read batch base offset and if it's
greater than the position from where the read occurred (fetch offset)
then if there exists any `available` batches in the state cache then
they will be archived.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
When a cluster is configured with a dynamic controller quorum, KRaft replica's endpoint are computed using the advertised.listeners property and not the quorum.controller.voters property. This change in the configuration makes it difficult to keeping all previous node configurations compatible with the new endpoint discovery functionality.
The least intrusive solution is to rely on Kafka's reverse hostname lookup when the hostname is not specified. The effective advertised controller listener now remove '0.0.0.0' hostname if the endpoint came from the listener configuration and not the advertised.listener configuration.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
The PR handles slicing of fetched records based on acquire response for
share fetch. There could be additional bytes fetched from log but
acquired offsets can be a subset, typically with `max fetch records`
configuration. Rather sending additional bytes of fetched data to client
we should slice the file and wire only needed batches.
Note: If the acquired offsets are within a batch then we need to send
the entire batch within the file record. Hence rather checking for
individual batches, PR finds the first and last acquired offset, and
trims the file for all batches between (inclusive) these two offsets.
Reviewers: Christo Lolov <lolovc@amazon.com>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
The PR does following:
1. Adds `fetchOffset` to `acquire` API in SharePartition.
2. Adds a ShareFetchPartitionData class efficiently handle the
propagation of fetchOffset information.
3. Updates SharePartitionTests to make common code so such improvements
does not require all tests changes for future PRs.
Reviewers: Andrew Schofield <aschofield@confluent.io>
PR implements the final set of ShareGroupMetrics,
RequestTopicPartitionsFetchRatio and TopicPartitionsAcquireTimeMs, as
defined in KIP-1103:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1103%3A+Additional+metrics+for+cooperative+consumption
Note: Metric `RequestTopicPartitionsFetchRatio` is calculated as
percentage as Histogram API doesn't record double.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
- ELR is enabled (ELRV_1) by default if the cluster is created with its bootstrap metadata version >= IBP_4_1_IV0.
- ELRV_1 can be manually enabled iff the metadata version is >= IBP_4_0_IV1.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>, David Jacot <djacot@confluent.io>
3 tests which were marked flaky in ShareConsumerTest do not have any
failure on trunk since the test was converted to use `ClusterTestExtensions`.
Reviewers: Sushant Mahajan <smahajan@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
The current Docker Hub documentation for Kafka is based on the use of static voters. Since Kafka 4.0 utilizes dynamic voters, users following the doc of docker hub may encounter unexpected behavior. Due to the limited time available for the 4.0.0 release, a simple and quick solution is to revert to using static voters within the Docker image. This can be achieved by adding a configuration file with static voter definitions to the kafka/docker folder, keeping it separate from the main kafka/config directory. This approach allows us to encourage the use of dynamic voters in typical deployments while maintaining compatibility within the Docker image.
Reviewers: Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
fix the following behavior changes.
1) in log4j 1, users can't change the logger by parent if the logger is declared by properties explicitly. For example, `org.apache.kafka.controller` has level explicitly in the properties. Hence, we can't use "org.apache.kafka=INFO" to change the level of `org.apache.kafka.controller` to INFO. By contrast, log4j2 allows us to change all child loggers by the parent logger.
2) in log4j2, we can change the level of root to impact all loggers' level. By contrast, log4j 1 can't.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Cleanup code to avoid rawtype, and add suppressions where necessary.
Change the build to fail on rawtype warning.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
3.3.0 was the first KRaft release that was deemed production-ready and also
when KIP-778 (KRaft to KRaft upgrades) landed. Given that, it's reasonable
for 4.x to only support upgrades from 3.3.0 or newer (the metadata version also
needs to be set to "3.3" or newer before upgrading).
Noteworthy changes:
1. `AlterPartition` no longer includes topic names, which makes it possible to
simplify `AlterParitionManager` logic.
2. Metadata versions older than `IBP_3_3_IV3` have been removed and
`IBP_3_3_IV3` is now the minimum version.
3. `MINIMUM_BOOTSTRAP_VERSION` has been removed.
4. Removed `isLeaderRecoverySupported`, `isNoOpsRecordSupported`,
`isKRaftSupported`, `isBrokerRegistrationChangeRecordSupported` and
`isInControlledShutdownStateSupported` - these are always `true` now.
Also removed related conditional code.
5. Removed default metadata version or metadata version fallbacks in
multiple places - we now fail-fast instead of potentially using an incorrect
metadata version.
6. Update `MetadataBatchLoader.resetToImage` to set `hasSeenRecord`
based on whether image is empty - this was a previously existing issue that
became more apparent after the changes in this PR.
7. Remove `ibp` parameter from `BootstrapDirectory`
8. A number of tests were not useful anymore and have been removed.
I will update the upgrade notes via a separate PR as there are a few things that
need changing and it would be easier to do so that way.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluen.io>, Ken Huang <s7133700@gmail.com>
- update reflection-config.json and resource-config.json to include log4j2 and jackson
- remove unused jackson scala library
- fix the incorrect path of log4j2.yaml
- adopt workaround (--standalone) to make this PR work and it will be fixed by KAFKA-18737)
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
- Fixed the RemoteIndexCacheTest that fails with caffeine > 3.1.1
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
The stale/invalid files that ends-with ".deleted" and ".tmp" should be cleaned when the broker gets restarted.
- fix the remote-index-cache test to use the logDir instead of topicDir
- fix the flaky test
Reviewers: Luke Chen <showuon@gmail.com>
In PR #18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Frequently updating the trust store can cause unexpected termination of the AsyncConsumer background thread.
1. To resolve this issue, reuse the same AdminClient instead of recreating it.
2. Add error logging when fail to initialize resources for the consumer network thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KAFKA-16720 aims to add the support for the AlterShareGroupOffsets AdminClient. Key Changes in the PR:
1. Added handing of alterShareGroupOffsets() in KafkaAdminClient and introduce AlterShareGroupOffsetRequest/AlterShareGroupOffsetResponse/AlterShareGroupOffsetsOptions classes.
2. Corresponding test in KafkaAdminClientTest.
3. Added ALTER_SHARE_GROUP_OFFSETS API (will finish it in next PR and the share coordinator pieces)
Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
During the transaction commit phase, it is normal to hit CONCURRENT_TRANSACTION error before the transaction markers are fully propagated. Instead of letting the client to retry the produce request, it is better to retry on the server side.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
Rewrite BrokerMetricNamesTest using ReplicaManager.MetricNames, ensuring that all metrics are always included. This helps prevent issues like PartitionsWithLateTransactionsCount not being correctly included in the test before.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The case testDescribeQuorumRequestToControllers shutdowns raft client but not the controller. This makes client has chance to send a request to the controller and get NOT_LEADER_OR_FOLLOWER error. However, if the raft client finishes shutdown before handling the request, the request will not be handled. Shutdown the controller before doing KafkaFuture#get for the client request, so we can make sure the request is handled by another controller eventually.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
Sometimes we didn't get into abortable state before aborting, so the epoch didn't get bumped. Now we force abortable state with an attempt to send before aborting so the epoch bump occurs as expected.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
* KAFKA-18758: NullPointerException in shutdown following InvalidConfigurationException
Add checks for null in shutdown as BrokerLifecycleManager is not instantiaited if LogManager constructor throws an Exception