This PR adds the following metrics for each of the supported production
features (`metadata.version`, `kraft.version`, `transaction.version`,
etc.):
`kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=X`
`kafka.server:type=node-metrics,name=maximum-supported-level,feature-name=X`
`kafka.server:type=node-metrics,name=minimum-supported-level,feature-name=X`
Reviewers: Josep Prat <josep.prat@aiven.io>, PoAn Yang
<payang@apache.org>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Lan Ding
<isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
While walking through the source code I confirmed that the broker checks
`replica.fetch.min.bytes` exactly the same way it checks
`fetch.min.bytes`, so this patch updates the wording for both config
keys.
Co-authored-by: yangxuze <xuze_yang@163.com>
Reviewers: Luke Chen <showuon@gmail.com>
Creates GetReplicaLogInfoRequest and GetReplicaLogInfoResponse RPCs
Information returned by these brokers will be used to aid
unclean-recovery by selecting longest logs.
Reviewers: Alyssa Huang <ahuang@confluent.io>, Calvin Liu <caliu@confluent.io>, Colin P. McCabe <cmccabe@apache.org>, TaiJuWu <tjwu1217@gmail.com>
Migrate ControllerMutationQuotaManager to Java implementation and move
to server module, including ClientQuotaManager and associated files.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When writing HTML, it's recommended to use the <code> element instead of
backticks for inline code formatting.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
issue: https://github.com/apache/kafka/pull/19687/files#r2094574178
Why:
- To improve performance by avoiding redundant temporary collections and
repeated method calls.
- To make the utility more flexible for inputs from both Java and Scala.
What:
- Refactored `createResponseConfig` in `ConfigHelper.scala` by
overloading the method to accept both Java maps and `AbstractConfig`.
- Extracted helper functions to `ConfigHelperUtils` in the server
module.
Reviewers: Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
Description
* Replace `org.apache.kafka.common.test.TestUtils` with
`org.apache.kafka.test.TestUtils` in outer package modules to
standardize test utility usage
* Move `waitUntilLeaderIsElectedOrChangedWithAdmin` method from
`org.apache.kafka.test.TestUtils` to `ClusterInstance` and refactor for
better code organization
* Add `org.apache.kafka.test.TestUtils` dependency to
`transaction-coordinator` import control
Reviewers: PoAn Yang [payang@apache.org](mailto:payang@apache.org), Ken
Huang [s7133700@gmail.com](mailto:s7133700@gmail.com), Ken Huang
[s7133700@gmail.com](mailto:s7133700@gmail.com), Chia-Ping Tsai
[chia7712@gmail.com](mailto:chia7712@gmail.com)
Minor fix to correct the validate condition for GetTelemetryRequests.
Added respective tests as well.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Simplify Set initialization and reduce the overhead of creating extra
collections.
The changes mostly include:
- new HashSet<>(List.of(...))
- new HashSet<>(Arrays.asList(...)) / new HashSet<>(asList(...))
- new HashSet<>(Collections.singletonList()) / new
HashSet<>(singletonList())
- new HashSet<>(Collections.emptyList())
- new HashSet<>(Set.of())
This change takes the following into account, and we will not change to
Set.of in these scenarios:
- Require `mutability` (UnsupportedOperationException).
- Allow `duplicate` elements (IllegalArgumentException).
- Allow `null` elements (NullPointerException).
- Depend on `Ordering`. `Set.of` does not guarantee order, so it could
make tests flaky or break public interfaces.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Log segment closure results in right sizing the segment on disk along
with the associated index files.
This is specially important for TimeIndexes where a failure to right
size may eventually cause log roll failures leading to under replication
and log cleaner failures.
This change uses `Utils.closeAll` which propagates exceptions, resulting
in an "unclean" shutdown. That would then cause the broker to attempt to
recover the log segment and the index on next startup, thereby avoiding
the failures described above.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either watched keys but
then again the reference to the operation is still present in other
watched key. Which means the memory can only be free once purge
operation is triggered by DelayedOperationPurgatory which removes the
watched key operation from remaining keys, as the operation is already
completed.
The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent the broker.
This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.
#### Testing
Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc
[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.
Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
Move AddPartitionsToTxnManager to server module and convert to Java.
This patch moves AddPartitionsToTxnManager from the core module to the
server module, with its package updated from `kafka.server` to
`org.apache.kafka.server.transaction`. Additionally, several
configuration used by AddPartitionsToTxnManager are moved from
KafkaConfig.scala to AbstractKafkaConfig.java.
- brokerId
- requestTimeoutMs
- controllerListenerNames
- interBrokerListenerName
- interBrokerSecurityProtocol
- effectiveListenerSecurityProtocolMap
The next PR will move AddPartitionsToTxnManagerTest.scala to java
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
Update catch to handle compression errors
Before :

After
```
Sent message: KR Message 376
[kafka-producer-network-thread | kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
KR: Failed to compress telemetry payload for compression: zstd, sending
uncompressed data
Sent message: KR Message 377
```
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Before 4.1, the api key 74 is `ListClientMetricsResources`. After 4.1,
it's `ListConfigResources`. If users sent a v0 ListConfigResources to
broker, the metric doesn't record request with
`ListClientMetricsResources`. This PR is to add
`ListClientMetricsResources` metric if the request is v0
`ListConfigResources`.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
1. Move `LogReadResult` to server module.
2. Rewrite `LogReadResult` in Java.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, TengYao Chi <frankvicky@apache.org>
Use ClusterTest and java to rewrite `EndToEndClusterIdTest` and move it
to the server module
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Move `LoggingController` to server module and rewrite it in java.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Migrate MinInSyncReplicasConfigTest to the server module
Reviewers: PoAn Yang <payang@apache.org>, Yung
<yungyung7654321@gmail.com>, TengYao Chi <frankvicky@apache.org>, Ken
Huang <s7133700@gmail.com>
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
The PR is a minor follow up on
https://github.com/apache/kafka/pull/19659.
KafkaApis.scala already have a check which denies new share fetch
related calls if the share group is not supported. Hence no new sessions
shall be created so the requirement to have share group enabled flag in
ShareSessionCache is not needed, unless I am missing something.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
The root case of flakiness is race condition between worker thread
(thread which is executing the test) and executor-testDelayedFuture
(thread which should execute callback).
It was fixed with TestUtils#waitForCondition to wait until callback will
be done
Test evidence:
Test was running 1000 times with repeated test.
Results: `~/p/kafka (bloku/kafka-19091) [1]> ./gradlew server:test
--tests DelayedFutureTest --fail-fast > res.txt` `~/p/kafka
(bloku/kafka-19091)> grep FAILED res.txt ` `~/p/kafka
(bloku/kafka-19091) [1]>`
res.txt: `> Task :server:test`
`Gradle Test Run :server:test > Gradle Test Executor 14 >
DelayedFutureTest > testDelayedFuture() > repetition 1 of 1000 PASSED`
...
`BUILD SUCCESSFUL in 37m`
Reviewers: Ken Huang <s7133700@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
The PR adds ShareGroupListener which triggers on group changes, such as
when member leaves or group is empty. Such events are specific to
connection on respective broker. There events help to clean specific
states managed for respective member or group in various caches.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Move LeaderEndPoint to Server module
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
This PR creates a listener for `SharePartitionManager` to listen to any
changes in `ShareVersion` feature. In case, there is a toggle, we need
to change the attributes in `SharePartitionManager` accordingly.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This PR moves SchedulerTest to server module and rewrite it with java.
Please also check updated import control config!
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
1. Move `ForwardingManagerMetrics` and `ForwardingManagerMetricsTest` to
server module.
2. Rewrite them in Java.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The PR fixes the issue when ShareAcknowledgements are piggybacked on
ShareFetch. The current default configuration in clients sets `batch
size` and `max fetch records` as per the `max.poll.records` config,
default 500. Which means all records in a single poll will be fetched
and acknowledged. Also the default configuration for inflight records in
a partition is 200. Which means prior fetch records has to be
acknowledged prior fetching another batch from share partition.
The piggybacked share fetch-acknowledgement calls from KafkaApis are
async and later the response is combined. If respective share fetch
starts waiting in purgatory because all inflight records are currently
full, hence when startOffset is moved as part of acknowledgement, then a
trigger should happen which should try completing any pending share
fetch requests in purgatory. Else the share fetch requests wait in
purgatory for timeout though records are available, which dips the share
fetch performance.
The regular fetch has a single criteria to land requests in purgatory,
which is min bytes criteria, hence any produce in respective topic
partition triggers to check any pending fetch requests. But share fetch
can wait in purgatory because of multiple reasons: 1) Min bytes 2)
Inflight records exhaustion 3) Share partition fetch lock competition.
The trigger already happens for 1 and current PR fixes 2. We will
investigate further if there should be any handling required for 3.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
replace all applicable `.stream().forEach()` in codebase with just
`.forEach()`.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Up till now, the share sessions in the broker were only attempted to
evict when the share session cache was full and a new session was trying
to get registered. With the changes in this PR, whenever a share
consumer gets disconnected from the broker, the corresponding share
session would be evicted from the cache.
Note - `connectAndReceiveWithoutClosingSocket` has been introduced in
`GroupCoordinatorBaseRequestTest`. This method creates a socket
connection, sends the request, receives a response but does not close
the connection. Instead, these sockets are stored in a ListBuffer
`openSockets`, which are closed in tearDown method after each test is
run. Also, all the `connectAndReceive` calls in
`ShareFetchAcknowledgeRequestTest` have been replaced by
`connectAndReceiveWithoutClosingSocket`, because these tests depends
upon the persistence of the share sessions on the broker once
registered. But, with the new code introduced, as soon as the socket
connection is closed, a connection drop is assumed by the broker,
leading to session eviction.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Add new StreamsGroupFeature, disabled by default, and add "streams" as
default value to `group.coordinator.rebalance.protocols`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<david.jacot@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>,
Justine Olshan <jolshan@confluent.io>, Andrew Schofield
<aschofield@confluent.io>, Jun Rao <jun@confluent.io>
There will be an update to the PluginMetrics#metricName method: the type
of the tags parameter will be changed
from Map to LinkedHashMap.
This change is necessary because the order of metric tags is important
1. If the tag order is inconsistent, identical metrics may be treated as
distinct ones by the metrics backend
2. KAFKA-18390 is updating metric naming to use LinkedHashMap. For
consistency, we should follow the same approach here.
Reviewers: TengYao Chi <frankvicky@apache.org>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, lllilllilllilili
This PR marks the records as non-nullable for ShareFetch.
This PR is as per the changes for Fetch:
https://github.com/apache/kafka/pull/18726 and some work for ShareFetch
was done here: https://github.com/apache/kafka/pull/19167. I tested with
marking `records` as non-nullable in ShareFetch, which required
additional handling. The same has been fixed in current PR.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>, PoAn Yang
<payang@apache.org>
The generated response data classes take Readable as input to parse the
Response. However, the associated response objects take ByteBuffer as
input and thus convert them to Readable using `new ByteBufferAccessor`
call.
This PR changes the parse method of all the response classes to take the
Readable interface instead so that no such conversion is needed.
To support parsing the ApiVersionsResponse twice for different version
this change adds the "slice" method to the Readable interface.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Truc Nguyen
<[trnguyen@confluent.io](mailto:trnguyen@confluent.io)>, Aadithya
Chandra <[aadithya.c@gmail.com](mailto:aadithya.c@gmail.com)>
Currently the share session cache is desgined like the fetch session
cache. If the cache is full and a new share session is trying to get get
initialized, then the sessions which haven't been touched for more than
2minutes are evicted. This wouldn't be right for share sessions as the
members also hold locks on the acquired records, and session eviction
would mean theose locks will need to be dropped and the corresponding
records re-delivered. This PR removes the time based eviction logic for
share sessions.
Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1. Remove `RemoteLogManager#startup` and
`RemoteLogManager#onEndpointCreated`
2. Move endpoint creation to `BrokerServer`
3. Move `RemoteLogMetadataManager#configure` and
`RemoteLogStorageManager#configure` to RemoteLogManager constructor
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Separates metadata-related configurations from the `KRaftConfigs` into
the `MetadataLogConfig` class.
Previously, metadata-related configs were placed in `KRaftConfigs`,
which mixed server-related configs (like process.roles) with
metadata-specific ones (like metadata.log.*), leading to confusion and
tight coupling.
In this PR:
- Extract metadata-related config definitions and variables from
`KRaftConfig` into `MetadataLogConfig`.
- Move `node.id` out of `MetadataLogConfig` into `KafkaMetadataLog’s
constructor` to avoid redundant config references.
- Leave server-related configurations in `KRaftConfig`, consistent with
its role.
This separation makes `KafkaConfig` and `KRaftConfig` cleaner, and
aligns with the goal of having a dedicated MetadataLogConfig class for
managing metadata-specific configurations.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This also adds metrics to StandardAuthorizer
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
<tjwu1217@gmail.com>
This patch adds ACL support for 2PC as a part of KIP-939
A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT
((byte) 15 . When InitProducerId comes with enable2Pc=true, it would
have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the
transactional id resource.
The kafka-acls.sh tool is going to support a new --operation
TwoPhaseCommit.
Reviewers: Artem Livshits <alivshits@confluent.io>, PoAn Yang
<poan.yang@suse.com>, Justine Olshan <jolshan@confluent.io>
1. Migrate DelegationTokenManager to server module.
2. Rewrite DelegationTokenManager in Java.
3. Move DelegationTokenManagerConfigs out of KafkaConfig.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
class `VoidEvent` provides singleton object , but nobody use it. I
think we should private `VoidEvent` constructor and only use singleton.
use `UnaryOperator<OptionalLong>` instead
`Function<OptionalLong,OptionalLong>`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Move LogCleaner and related classes to storage module and rewrite in
Java.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Jun Rao <junrao@gmail.com>
This PR proposes a switch to enable share groups for 4.1 (preview) and
4.2 (GA).
* `share.version=1` to indicate that share groups are enabled. This is
used as the switch for turning share groups on and off.
In 4.1, the default will be `share.version=0`. Then a user wanting to
evaluate the preview of KIP-932 would use `bin/kafka-features.sh
--bootstrap.server xxxx upgrade --feature share.version=1`.
In 4.2, the default will be `share.version=1`.
Reviewers: Jun Rao <junrao@gmail.com>
JIRA: KAFKA-18935 This patch ensures the broker will not return null
records in FetchResponse. For more details, please refer to the
ticket.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai
<chia7712@gmail.com>, Jun Rao <junrao@gmail.com>