Commit Graph

16231 Commits

Author SHA1 Message Date
Evanston Zhou 7ffd6934ad
MINOR: Add example integration test commands to README (#20413)
Adds example commands for running integration tests from the command
line.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-27 13:59:31 +08:00
Abhijeet Kumar 8d93d1096c
KAFKA-17108: Add EarliestPendingUpload offset spec in ListOffsets API (#16584)
This is the first part of the implementation of

[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)

The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.

Added unit tests for the new timestamp.

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
2025-08-27 08:34:31 +05:30
jimmy 5fcbf3d3b1
KAFKA-18853 Add documentation to remind users to use valid LogLevelConfig constants (#20249)
This PR aims to add documentation to `alterLogLevelConfigs` method to
remind users to use valid LogLevelConfig constants.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-27 10:52:02 +08:00
Ken Huang 08057eac53
KAFKA-18600 Cleanup NetworkClient zk related logging (#18644)
This PR removes associated logging within NetworkClient to reduce noise
and streamline the client code.

Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur
 <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-27 03:51:28 +08:00
Ken Huang a9b2a6d9b6
MINOR: Optimize the entriesWithoutErrorsPerPartition when errorResults is empty (#20410)
If `errorResults` is empty, there’s no need to create a new
`entriesPerPartition` map.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-27 03:11:35 +08:00
Apoorv Mittal 49ee1fb4f9
KAFKA-19632: Handle overlap batch on partition re-assignment (#20395)
The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:
```
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
     lastAcquiredOffset = lastOffset;
}
```

While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
 <adixit@confluent.io>, Chirag Wadhwa <cwadhwa@confluent.io>
2025-08-26 13:25:57 +01:00
Chang-Chi Hsu 71fdab1c5d
MINOR: describeTopics should pass the timeout to the describeCluster call (#20375)
This PR ensures that describeTopics correctly propagates its timeoutMs
setting to the underlying describeCluster call. Integration tests were
added to verify that the API now fails with a TimeoutException when
brokers do not respond within the configured timeout.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-26 19:38:53 +08:00
Kirk True 4e0d8c984b
MINOR: renamed testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe to align test case (#20407)
`testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe` does not align with the test case. This patch renames the test name to describe the test case more precisely.
Reviewers: TengYao Chi <frankvicky@apache.org>
2025-08-26 17:00:37 +08:00
Abhijeet Kumar 614bc3a19d
KAFKA-17344: Add empty replica FollowerFetch tests (#16884)
Add Unit Tests for an empty follower fetch for various Leader states.

| TieredStorage Enabled | Leader Log Start Offset | Leader Local Log
Start Offset | Leader Log End Offset | Remarks
|

|-----------------------|-------------------------|--------------------------------|-----------------------|---------------------------------------|
| N                     | 0                       | -
| 200                   | -                                     |  | N
| 10                      | -                              | 200
| -                                     |  | Y                     | 0
| 200                            | 200                   | No segments
deleted locally           |  | Y                     | 0
| 200                            | 100                   | Segments
uploaded and deleted locally |  | Y                     | 0
| 200                            | 200                   | All segments
deleted locally          |  | Y                     | 10
| 10                             | 200                   | No segments
deleted locally           |  | Y                     | 10
| 100                            | 200                   | Segments
uploaded and deleted locally |  | Y                     | 10
| 200                            | 200                   | All segments
deleted locally          |

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
2025-08-26 14:19:11 +05:30
jimmy d2f162a071
MINOR: kafka-stream-groups.sh should fail quickly if the partition leader is unavailable (#20271)
This PR applies the same partition leader check for `StreamsGroupCommand` as
`ShareGroupCommand`  and `ConsumerGroupCommand` to avoid the command
execution timeout.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
2025-08-26 10:08:35 +02:00
Lucas Brutschy f621a635c1
KAFKA-19570: Implement offline migration for streams groups (#20288)
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.

Reviewers: Bill Bejeck <bbejeck@apache.org>, Sean Quah
 <squah@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>
2025-08-26 10:05:30 +02:00
Mickael Maison 30ffd42b26
MINOR: Cleanups in the release scripts (#20308)
A bunch of cleanups in the release scripts

Reviewers: Luke Chen <showuon@gmail.com>
2025-08-26 09:57:49 +02:00
PoAn Yang 5bbc421a13
MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions (#20370)
This is followup PR for https://github.com/apache/kafka/pull/19699.

* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Justine Olshan
 <jolshan@confluent.io>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
 Tsai <chia7712@gmail.com>
2025-08-26 10:36:45 +08:00
PoAn Yang b2c1a0fb9f
KAFKA-18841: Enable to test docker image locally (#19028)
### Case 1: no --kafka-url and --kafka-archive

Should fail. One of argument (--kafka-url/--kafka-archive) is required.

```
> python docker_build_test.py apache/kafka --image-tag KAFKA-18841
--image-type jvm --build
usage: docker_build_test.py [-h] [--image-tag TAG] [--image-type
{jvm,native}] [--build] [--test] (--kafka-url KAFKA_URL |
--kafka-archive KAFKA_ARCHIVE) image
docker_build_test.py: error: one of the arguments --kafka-url/-u
--kafka-archive/-a is required
```

### Case 2: --kafka-url with native

```
> python docker_build_test.py apache/kafka --image-tag KAFKA-18841
--image-type native --kafka-url
https://dist.apache.org/repos/dist/dev/kafka/4.0.0-rc0/kafka_2.13-4.0.0.tgz
--build
```

### Case 3: --karka-url with jvm

```
> python docker_build_test.py apache/kafka --image-tag KAFKA-18841
--image-type jvm --kafka-url
https://dist.apache.org/repos/dist/dev/kafka/4.0.0-rc0/kafka_2.13-4.0.0.tgz
--build
```

### Case 4: --kafka-archive with native

```
> ./gradlew clean releaseTarGz
> cd docker
> python docker_build_test.py apache/kafka --image-tag KAFKA-18841
--image-type native --kafka-archive
</absolute/path/to/core/build/distributions/kafka_2.13-4.1.0-SNAPSHOT.tgz>
--build
```

### Case 5: --kafka-archive with jvm

```
> ./gradlew clean releaseTarGz
> cd docker
> python docker_build_test.py apache/kafka --image-tag KAFKA-18841
--image-type jvm --kafka-archive
</absolute/path/to/core/build/distributions/kafka_2.13-4.1.0-SNAPSHOT.tgz>
--build
```

Reviewers: Vedarth Sharma <vesharma@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-08-26 10:30:14 +08:00
Kuan-Po Tseng 0242d1c58a
MINOR: update kraft dynamic voter set doc (#20401)
Update the KRaft dynamic voter set documentation. In Kafka 4.1, we
introduced a powerful new feature that enables seamless migration from a
static voter set to a dynamic voter set.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-26 01:31:07 +08:00
Federico Valeri f97b95c60a
KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221)
This patch adds the include argument to ConsumerPerformance tool.

ConsoleConsumer and ConsumerPerformance serve different purposes but
share common functionality for message consumption. Currently, there's
an inconsistency in their command-line interfaces:

- ConsoleConsumer supports an --include argument that allows users to
specify a regular expression pattern to filter topics for consumption
- ConsumerPerformance lacks this topic filtering capability, requiring
users to specify a single topic explicitly via --topic argument

This inconsistency creates two problems:

- Similar tools should provide similar topic selection capabilities for
better user experience
- Users cannot test consumer performance across multiple topics or
dynamically matching topic sets, making it difficult to test realistic
scenarios

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-25 04:15:37 +08:00
Kuan-Po Tseng ecd5b4c157
MINOR: enhance DescribeClusterResponse ControllerId description (#20400)
enhance the description of ControllerId in DescribeClusterResponse.json

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-25 02:41:02 +08:00
Michael Morris 732b22daff
MINOR: Upgrade log4j to version 2.25.1 (#20132)
Upgrade log4j to version 2.25.1

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-08-25 02:35:12 +08:00
Chang-Chi Hsu c01723340f
MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra (#20199)
**Changes**:  Use ClusterTest to rewrite
EligibleLeaderReplicasIntegrationTest.

**Validation**: Run the test 50 times locally with consistent success.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-24 01:35:20 +08:00
Kaushik Raina b4bf0bf693
KAFKA-19506 Implement dynamic compression type selection and fallback for client telemetry (#20144)
#### Summary
This PR implements dynamic compression type selection and fallback
mechanism for client telemetry to handle cases where compression
libraries are not available on the client classpath.

#### Problem
Currently, when a compression library is missing (e.g.,
NoClassDefFoundError), the client telemetry system catches the generic
Throwable but doesn't learn from the failure. This means, the same
unsupported compression type will be attempted on every telemetry push

#### Solution
This PR introduces a comprehensive fallback mechanism:
- Specific Exception Handling: Replace generic Throwable catching with
specific exceptions (IOException, NoClassDefFoundError)
- Unsupported Compression Tracking: Add unsupportedCompressionTypes
collection to track compression types that have failed due to missing
libraries
- Dynamic Selection: Enhance
ClientTelemetryUtils.preferredCompressionType() to accept an unsupported
types parameter and filter out known problematic compression types
- Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe
access to the unsupported types collection
- Improved Logging: Include exception details in log messages for better
debugging

#### Key Changes
- Modified createPushRequest() to track failed compression types in
unsupportedCompressionTypes
- Updated ClientTelemetryUtils.preferredCompressionType() to filter out
unsupported types
- Enhanced exception handling with specific exception types instead of
Throwable

#### Testing
- Added appropriate Unit tests
- Testing apache kafka on local logs:
```
✗ cat ~/Desktop/kafka-client.log | grep "
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter"
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry subscription request with client instance id
AAAAAAAAAAAAAAAAAAAAAA
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_NEEDED to
SUBSCRIPTION_IN_PROGRESS
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Telemetry subscription push interval value from broker was 5000; to
stagger requests the first push interval is being adjusted to 4551
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Updating subscription - subscription:
ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA,
subscriptionId=1650084878, pushIntervalMs=5000,
acceptedCompressionTypes=[zstd, lz4, snappy, none],
deltaTemporality=true,
selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398};
intervalMs: 4551, lastRequestMs: 1752739012639
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Client telemetry registered with client instance id:
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:56:57:224 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library zstd not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:56:57:295 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:02:296 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:02:297 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:02:300 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library lz4 not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:02:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:07:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:07:330 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:07:331 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library snappy not found, sending uncompressed data
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
        at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:07:344 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:12:400 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:17:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:22:508 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:27:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:32:578 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:37:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:42:646 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:47:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:52:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:57:765 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
```

Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-08-24 01:19:19 +08:00
Chang-Chi Hsu eba9839776
MINOR: Remove fetchQuotaMetrics and copyQuotaMetrics on close (#20394)
- Changes: Remove  fetchQuotaMetrics and copyQuotaMetrics in RemoteLogManager on close

from: https://github.com/apache/kafka/pull/20342#discussion_r2290612736

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
2025-08-23 10:04:58 +05:30
Yunchi Pang 511818e027
MINOR: Change LogCompactionTester topics parameter to Set<String> (#20372)
issue: https://github.com/apache/kafka/pull/19905#discussion_r2282202312

What: Change `String[] topics` to `Set<String> topics` throughout
`LogCompactionTester`. Why: `Set<String>` is more modern and reduces the
need for array-to-collection conversions.

Reviewers: Ken Huang  <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Jhen-Yung Hsu  <jhenyunghsu@gmail.com>, Lan Ding
 <isDing_L@163.com>, Kuan-Po Tseng  <brandboat@gmail.com>, Chia-Ping
 Tsai <chia7712@gmail.com>
2025-08-23 03:28:55 +08:00
Ken Huang 47bb46c10d
KAFKA-19582 the current assignments shown by ReassignPartitionsCommand should include the log directories (#20319)
The ReassignPartitionsCommand shows the topic replicas on each broker.
When using the --generate command, it returns the current partition
replica assignment. However, the log directory for each current replica
is always shown as any. This makes it impossible for users to determine
which specific log directory is being used by each replica. Therefore,
we should fix this behavior.

```
Current partition replica assignment
{
  "version": 1,
  "partitions": [
    {
      "topic": "test1",
      "partition": 0,
      "replicas": [
        4,
        2
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    }
  ]
}
```

This PR
```
Current partition replica assignment
{
  "version": 1,
  "partitions": [
    {
      "topic": "test1",
      "partition": 0,
      "replicas": [
        4,
        2
      ],
      "log_dirs": [
        "/tmp/kraft-broker-logs234",
        "/tmp/kraft-broker-logs"
      ]
    }
  ]
}
```

Reviewers: PoAn Yang <payang@apache.org>, Jhen-Yung Hsu
 <jhenyunghsu@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-08-23 02:57:00 +08:00
Andrew Schofield c565ba1a04
KAFKA-19598: Command-line arguments for producer perf test (#20361)
This implements KIP-1147 for kafka-producer-perf-test.sh.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-23 01:14:14 +08:00
Sanskar Jhajharia de92b200fc
KAFKA-18699 Cleanup Metadata Module (#20346)
https://issues.apache.org/jira/browse/KAFKA-18699

This PR aims at cleaning up the `metadata` module further by getting rid
of some extra code which can be replaced by record

Reviewers: Ken Huang <s7133700@gmail.com>, Ming-Yen Chung
<mingyen066@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-23 00:57:44 +08:00
Jinhe Zhang b8bd50a06d
MINOR: Port RestoreIntegrationTest to the new protocol (#20347)
The test uses regular consumer to commit offsets. New protocol requires
a streams consumer since we are using streams groups, otherwise we run
into group ID conflicts.

Followed the addition of the KafkaAdmin interface for setting offsets, a
Kafka Admin client is created and used the interface in to set the
committed offsets instead of instantiating a consumer.

Also enable all tests for stream new protocol.

Reviewers: Alieh Saeedi<asaeedi@confluent.io>, Kirk True
 <ktrue@confluent.io>, Matthias Sax <mjsax@apache.org>, Bill Bejeck
 <bbejeck@apache.org>
2025-08-22 11:39:43 -04:00
Chirag Wadhwa def5f16c33
KAFKA-19630: Reordered OR operands in archiveRecords method for SharePartiton (#20391)
As per the current implementation in archiveRecords, when LSO is
updated, if we have multiple record batches before the new LSO, then
only the first one gets archived. This is because of the following lines
of code ->

`isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1,
initialState);`

`isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch, initialState);`

The first record / batch will make `isAnyOffsetArchived` /
`isAnyBatchArchived` true, after which this line of code will
short-circuit and the methods `archivePerOffsetBatchRecords` /
`archiveCompleteBatch` will not be called again. This PR changes the
order of the expressions so that the short-circuit does not prevent from
archiving all the required batches.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-08-22 09:23:12 +01:00
Jhen-Yung Hsu eeb6a0d981
KAFKA-19618 the `record-size` and `throughput`arguments don't work in TestRaftServer (#20379)
The `record-size` and `throughput` arguments don’t work in
`TestRaftServer`. The `recordsPerSec` and `recordSize` values are always
hard-coded.

- Fix `recordsPerSec` and `recordSize` values hard-coded issue
- Add "Required" description to command-line options to make it clear to
users.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-22 01:43:52 +08:00
Sanskar Jhajharia 0202721b4c
MINOR: Cleanups in jmh-benchmarks module (#20374)
This PR aims at cleaning up the `jmh-benchmarks` module further by
getting rid of some extra code which can be replaced by record

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-08-20 22:20:54 +08:00
Kamal Chandraprakash a056672f7c
KAFKA-19599: Reduce the frequency of ReplicaNotAvailableException thrown to clients when RLMM is not ready (#20345)
During broker restarts, the topic-based RemoteLogMetadataManager (RLMM)
constructs the state by reading the internal `__remote_log_metadata`
topic. When the partition is not ready to perform remote storage
operations, then ReplicaNotAvailableException thrown back to the
consumer. The clients retries the request immediately.

This results in a lot of FETCH requests on the broker and utilizes the
request handler threads. Using the CountdownLatch to reduce the
frequency of ReplicaNotAvailableException thrown back to the clients.
This will improve the request handler thread usage on the broker.

Previously for one consumer, when RLMM is not ready for a partition,
then ~9K  FetchConsumer requests / sec are received on the broker. With
this  patch, the number of FETCH requests reduced by 95% to 600 / sec.

Reviewers: Lan Ding <isDing_L@163.com>, Satish Duggana
 <satishd@apache.org>
2025-08-20 09:48:57 +05:30
Chih-Yuan Chien 84d817f40e
MINOR: Remove PartitionState (#20377)
Remove unused PartitionState. It was unused after #7222.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
 <payang@apache.org>
2025-08-20 08:12:23 +08:00
Sanskar Jhajharia 8dec45ff87
MINOR: Cleanup Connect Module (3/n) (#20156)
Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.

The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

Modules target: runtime/src/test

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
2025-08-20 04:17:39 +08:00
Kamal Chandraprakash f0c3d93104
KAFKA-19597: Stop the RSM after closing the remote-log reader threads to handle requests gracefully (#20342)
During shutdown, when the RSM closes first, then the ongoing requests
might throw an error. To handle the ongoing requests gracefully, closing
the RSM after closing the remote-log reader thread pools.

Reviewers: Satish Duggana <satishd@apache.org>
2025-08-19 21:56:27 +05:30
Sanskar Jhajharia 0da9cacffa
MINOR: Cleanups in Tools Module (3/n) (#20332)
This PR aims at cleaning up the tools module further by getting rid of
some extra code which can be replaced by `record`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-19 22:24:13 +08:00
Chih-Yuan Chien 38c3a411e9
MINOR: Fix typo and docs (#20373)
Fix typo and docs in following.
```
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
clients/src/main/resources/common/message/FetchRequest.json
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
```

Reviewers: Kuan-Po Tseng <brandboat@gmail.com>, Lan Ding
 <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
 <payang@apache.org>
2025-08-19 19:04:41 +08:00
Shashank 656242775c
KAFKA-15307: Kafka Streams configuration docs outdated (#20329)
Updated Kafka Streams configuration documentation to stay latest with
version 4.0.0.

Reviewers: Matthias J. Sax <matthias@confluent.io>
2025-08-17 13:13:35 -07:00
Yunchi Pang 4a5562c341
KAFKA-19306 Migrate LogCompactionTester to tools module (#19905)
jira: [KAFKA-19306](https://issues.apache.org/jira/browse/KAFKA-19306)

log
```
Producing 1000000 messages..to topics
log-cleaner-test-849894102467800668-0
Logging produce requests to
/tmp/kafka-log-cleaner-produced-6049271649847384547.txt
Sleeping for 20seconds...
Consuming messages...
Logging consumed messages to
/tmp/kafka-log-cleaner-consumed-7065252868189829937.txt
1000000 rows of data produced, 120176 rows of data consumed (88.0%
reduction).
De-duplicating and validating output files...
Validated 90057 values, 0 mismatches.
Data verification is completed
```
result
```
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-07-10--001
run time:         1 minute 2.051 seconds
tests run:        1
passed:           1
flaky:            0
failed:           0
ignored:          0
================================================================================
test_id:
kafkatest.tests.tools.log_compaction_test.LogCompactionTest.test_log_compaction.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   1 minute 1.809 seconds
```

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-08-18 02:49:06 +08:00
Maros Orsak 93068b4a1b
MINOR: Fix the compression arguments in TestLinearWriteSpeed (#20349)
This PR fixes a problem related to `TestLinearWriteSpeed`.  During my
work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed`
do not account for compression algorithms. It always uses
`Compression.NONE` when creating records. The problem was introduced in
this PR [1].

[1] - https://github.com/apache/kafka/pull/17736

Reviewers: Ken Huang <s7133700@gmail.com>, Mickael Maison
<mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-17 19:21:51 +08:00
Ming-Yen Chung cae9848160
KAFKA-19447 Replace PartitionState with PartitionRegistration in makeFollower/makeLeader (#20335)
Follow-up to
[KAFKA-18486](https://issues.apache.org/jira/browse/KAFKA-18486)
* Replace PartitionState with PartitionRegistration in
makeFollower/makeLeader
* Remove PartitionState.java since it is no longer referenced

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
 Chia-Ping Tsai <chia7712@gmail.com>
2025-08-17 18:14:09 +08:00
Hong-Yi Chen bf0e6ba700
KAFKA-19384 The passing of BrokerRegistrationRequestTest is a false positive (#20338)
Fixes a false positive in `BrokerRegistrationRequestTest` caused by
`isMigratingZkBroker`, and migrates the test from Scala to Java.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-17 01:19:10 +08:00
PoAn Yang 990cb5c06c
KAFKA-18884 Move TransactionMetadata to transaction-coordinator module (#19699)
1. Move TransactionMetadata to transaction-coordinator module.
2. Rewrite TransactionMetadata in Java.
3. The `topicPartitions` field uses `HashSet` instead of `Set`, because
it's mutable field.
4. In Scala, when calling `prepare*` methods, they can use current value
as default input in `prepareTransitionTo`. However, in Java, it doesn't
support function default input value. To avoid a lot of duplicated code
or assign value to wrong field, we add a private class `TransitionData`.
It can get current `TransactionMetadata` value as default value and
`prepare*` methods just need to assign updated value.

Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
 <alivshits@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-16 02:10:52 +08:00
OuO 27647c7c7c
MINOR: Remove the MetaLogShim namings (#20357)
Correct parameter name from `logManager` to `raftClient` (leftover from
PR #10705)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-08-16 02:02:56 +08:00
majialong 5078b228f5
MINOR: Remove broken link in CoordinatorMetricsShard javadoc (#20355)
CoordinatorMetricsShard was split into a separate module in
(https://github.com/apache/kafka/pull/16883), causing the link in the
javadoc to become invalid.

So, remove broken link in CoordinatorMetricsShard javadoc.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Sanskar Jhajharia
<sjhajharia@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-16 00:02:32 +08:00
Kevin Wu 833e25f015
KAFKA-19605; Fix the busy loop occurring in kraft client observers (#20354)
The broker observer should not read update voter set timer value when
polling to determine its backoff, since brokers cannot auto-join the
KRaft voter set. If auto-join or kraft.version=1 is not supported,
controller observers should not read this timer either when polling.

The updateVoterSetPeriodMs timer is not something that should be
considered when calculating the backoff returned by polling, since this
timer does not represent the same thing as the fetchTimeMs timer.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, José Armando García
 Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>,
 Kuan-Po Tseng <brandboat@gmail.com>
2025-08-15 10:43:46 -04:00
Jhen-Yung Hsu 55260e9835
KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest to clients-integration-tests module (#20339)
This PR does the following:

- Rewrite to new test infra.
- Rewrite to java.
- Move to clients-integration-tests.
- Add `ensureConsistentMetadata` method to `ClusterInstance`,
similar to `ensureConsistentKRaftMetadata` in the old infra, and
refactors related code.

Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang
 <s7133700@gmail.com>
2025-08-15 17:44:47 +08:00
stroller 58d894170a
MINOR: Fix typo in `AdminBootstrapAddresses` (#20352)
Fix the typo in `AdminBootstrapAddresses`.

Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang
<s7133700@gmail.com>
2025-08-15 14:19:40 +08:00
Ming-Yen Chung c4fb1008c4
MINOR: Use lambda expressions instead of ImmutableValue for Gauges (#20351)
Refactor metric gauges instantiation to use lambda expressions instead
of ImmutableValue.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-08-14 20:35:21 +08:00
Robert Young 3067f15caf
KAFKA-19596: Improve visibility when topic auto-creation fails (#20340)
Log a warning for each topic that failed to be created as a result of an
automatic creation. This makes the underlying cause more visible so
users can take action.

Previously, at the default log level, you could only see logs that the
broker was attempting to autocreate topics. If the creation failed, then
it was logged at debug.

Signed-off-by: Robert Young <robertyoungnz@gmail.com>

Reviewers: Luke Chen <showuon@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>
2025-08-14 10:47:12 +08:00
Kevin Wu 92d8cb562a
KAFKA-19078 Automatic controller addition to cluster metadata partition (#19589)
Add the `controller.quorum.auto.join.enable` configuration. When enabled
with KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:

- Automatically remove voter set entries which match their replica id
but not directory id by sending the `RemoveVoterRPC` to the leader.
- Automatically add themselves as a voter when their replica id is not
present in the voter set by sending the `AddVoterRPC` to the leader.

Reviewers: José Armando García Sancio
 [jsancio@apache.org](mailto:jsancio@apache.org),   Chia-Ping Tsai
 [chia7712@gmail.com](mailto:chia7712@gmail.com)
2025-08-13 23:20:18 +08:00
Sanskar Jhajharia dbf3808f53
MINOR: Add test coverage for StorageTool format command feature validation (#20303)
### Summary
Adds comprehensive test coverage for the StorageTool format command
feature validation, including tests for valid feature overrides, invalid
feature detection, and multiple feature specifications. Also adds debug
output to help with troubleshooting format operations.

### Changes Made

#### Test Coverage Improvements
- **`testFormatWithReleaseVersionAndFeatureOverride()`**: Tests that
feature overrides work correctly when specified with `--feature` flag

- **`testFormatWithInvalidFeatureThrowsError()`**: Tests error handling
for unsupported features

- **`testFormatWithMultipleFeatures()`**: Tests multiple feature
specifications in a single format command

#### Debug Output Enhancement
- **Formatter.java**: Added debug output to print bootstrap metadata
during format operations
  - Helps with troubleshooting format issues by showing the complete
bootstrap metadata being written
  - Improves visibility into what features and configurations are being
applied

#### Test Updates
- **FormatterTest.java**: Updated existing tests to account for the new
debug output\

### Related
- KIP-1022: [Formatting and Updating Features

](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1022%3A+Formatting+and+Updating+Features)

Reviewers: Kevin Wu <kevin.wu2412@gmail.com>, Justine Olshan
 <jolshan@confluent.io>
2025-08-12 12:51:39 -07:00