* Log error message if `broker.heartbeat.interval.ms * 2` is large than
`broker.session.timeout.ms`.
* Add test case
`testLogBrokerHeartbeatIntervalMsShouldBeLowerThanHalfOfBrokerSessionTimeoutMs`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add tests for producer state listing with, without, and invalid
brokerId.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
jira: https://issues.apache.org/jira/browse/KAFKA-19011 kip:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1172%3A+Improve+EndToEndLatency+tool
This PR improves the usability and maintainability of the
`kafka-e2e-latency.sh` tool:
- Replaces fixed-index argument parsing with a proper argument parser
(joptsimple)
- Adds support for configuring:
- -record-key-size: size of the message key
- -num-headers: number of headers per message
- -record-header-key-size: size of each header key
- -record-header-size: size of each header value
- Renames existing arguments to align with Kafka CLI conventions:
- broker_list → bootstrap-server
- num_messages → num-records
- message_size_bytes → record-size
- properties_file → command-config
-
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The original implementation uses a linear search to find the least
loaded process in O(n), and we can replace this by look-ups in a heap is
O(log(n)), as described below
Active tasks: For active tasks, we can do exactly the same assignment as
in the original algorithm by first building a heap (by load) of all
processes. When we assign a task, we pick the head off the heap, assign
the task to it, update the load, and re-insert it into the heap in
O(log(n)).
Standby tasks: For standby tasks, we cannot do this optimization
directly, because of the order in which we assign tasks:
1. We first try to assign task A to a process that previously owned A.
2. If we did not find such a process, we assign A to the least loaded
node.
3. We now try to assign task B to a process that previously owned B
4. If we did not find such a process, we assign B to the least loaded
node
...
The problem is that we cannot efficiently keep a heap (by load)
throughout this process, because finding and removing process that
previously owned A (and B and…) in the heap is O(n). We therefore need
to change the order of evaluation to be able to use a heap:
1. Try to assign all tasks A, B.. to a process that previously owned the
task
2. Build a heap.
3. Assign all remaining tasks to the least-loaded process that does not
yet own the task. Since at most NumStandbyReplicas already own the task,
we can do it by removing up to NumStandbyReplicas from the top of the
heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas).
Note that the change in order changes the resulting standby assignments
(although this difference does not show up in the existing unit tests).
I would argue that the new order of assignment will actually yield
better assignments, since the assignment will be more sticky, which has
the potential to reduce the amount of store we have to restore from the
changelog topic after assingments.
In our worst-performing benchmark, this improves the runtime by ~107x.
Reviewers: Bill Bejeck<bbejeck@apache.org>
*What*
https://issues.apache.org/jira/browse/KAFKA-18220
- Currently, `AsyncConsumerMetrics` extends `KafkaConsumerMetrics`, but
is being used both by `AsyncKafkaConsumer` and `ShareConsumerImpl`.
- `ShareConsumerImpl` only needs the async consumer metrics(the metrics
associated with the new consumer threading model).
- This needs to be fixed, we are unnecessarily having
`KafkaConsumerMetrics` as a parent class for `ShareConsumer` metrics.
Fix :
- In this PR, we have removed the dependancy of `AsyncConsumerMetrics`
on `KafkaConsumerMetrics` and made it an independent class which both
`AsyncKafkaConsumer` and `ShareConsumerImpl` will use.
- The "`asyncConsumerMetrics`" field represents the metrics associated
with the new consumer threading model (like application event queue
size, background queue size, etc).
- The "`kafkaConsumerMetrics`" and "`kafkaShareConsumerMetrics`" fields
denote the actual consumer metrics for `KafkaConsumer` and
`KafkaShareConsumer` respectively.
Reviewers: Andrew Schofield <aschofield@confluent.io>
A few cleanups including Java 17 syntax, collections and assertEquals() order
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Ensure that metrics are retrieved and displayed (when requested) before
`Consumer.close()` is called. This is important because metrics are
technically supposed to be removed on `Consumer.close()`, which means
retrieving them _after_ `close()` would yield an empty map.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This is an attempt at improving the client configuration files. We now
have sections and comments similar to the other properties files.
Reviewers: Kirk True <ktrue@confluent.io>, Luke Chen <showuon@gmail.com>
---------
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Add a lower bound to num.replica.fetchers.
Reviewers: PoAn Yang <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>,
Ken Huang <s7133700@gmail.com>, jimmy <wangzhiwang611@gmail.com>,
Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
issue: [KAFKA-19616](https://issues.apache.org/jira/browse/KAFKA-19616)
**why**: validate log compaction works correctly with compressed data.
**what**: adds compression config options to `LogCompactionTester` tool
and extends test coverage to validate log compaction with different
compression types and levels.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
In the 4.1 `upgrade-guide` describing the new KIP-1071 protocol it would
be helpful to display the configs you can set via `kafka-configs.sh`
with `streams` pre-pended to the configs, the command will fail
otherwise.
Reviewers: Andrew J Schofield<aschofield@apache.org>, Matthias J
Sax<mjsax@apache.org>, Genseric Ghiro<gghiro@confluent.io>
Cleanup default configs in
AutoTopicCreationManager#createStreamsInternalTopics. The streams
protocol would like to be consistent with the kafka streams using the
classic protocol - which would create the internal topics using
CreateTopic and therefore use the controller config.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
### Summary
This PR fixes two critical issues related to producer batch splitting
that can cause infinite retry loops and stack overflow errors when batch
sizes are significantly larger than broker-configured message size
limits.
### Issues Addressed
- **KAFKA-8350**: Producers endlessly retry batch splitting when
`batch.size` is much larger than topic-level `message.max.bytes`,
leading to infinite retry loops with "MESSAGE_TOO_LARGE" errors
- **KAFKA-8202**: Stack overflow errors in
`FutureRecordMetadata.chain()` due to excessive recursive splitting
attempts
### Root Cause
The existing batch splitting logic in
`RecordAccumulator.splitAndReenqueue()` always used the configured
`batchSize` parameter for splitting, regardless of whether the batch had
already been split before. This caused:
1. **Infinite loops**: When `batch.size` (e.g., 8MB) >>
`message.max.bytes` (e.g., 1MB), splits would never succeed since the
split size was still too large
2. **Stack overflow**: Repeated splitting attempts created deep call
chains in the metadata chaining logic
### Solution
Implemented progressive batch splitting logic:
```java
int maxBatchSize = this.batchSize;
if (bigBatch.isSplitBatch()) {
maxBatchSize = Math.max(bigBatch.maxRecordSize,
bigBatch.estimatedSizeInBytes() / 2);
}
```
__Key improvements:__
- __First split__: Uses original `batchSize` (maintains backward
compatibility)
- __Subsequent splits__: Uses the larger of:
- `maxRecordSize`: Ensures we can always split down to individual
records - `estimatedSizeInBytes() / 2`: Provides geometric reduction
for faster convergence
### Testing
Added comprehensive test
`testSplitAndReenqueuePreventInfiniteRecursion()` that:
- Creates oversized batches with 100 records of 1KB each
- Verifies splitting can reduce batches to single-record size
- Ensures no infinite recursion (safety limit of 100 operations)
- Validates no data loss or duplication during splitting
- Confirms all original records are preserved with correct keys
### Backward Compatibility
- No breaking changes to public APIs
- First split attempt still uses original `batchSize` configuration
- Progressive splitting only engages for retry scenarios
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson
<jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
###
---------
Co-authored-by: Michael Knox <mrknox@amazon.com>
These tests were written while finalizing approach for making inflight
state class thread safe but later approach changed and the lock is now
always required by SharePartition to change inflight state. Hence these
tests are incorrect and do not add any value.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Improve the wording in the upgrade doc slightly. Also fix a tiny
annoyance in the output from the message generator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
In KAFKA-19570 we implemented offline migration between groups, that is,
the following integration test or system test should be possible:
Test A:
- Start a streams application with classic protocol, process up to a
certain offset and commit the offset and shut down. - Start the same
streams application with streams protocol (same app ID!). - Make sure
that the offsets before the one committed in the first run are not
reprocessed in the second run.
Test B:
- Start a streams application with streams protocol, process up to a
certain offset and commit the offset and shut down. - Start the same
streams application with classic protocol (same app ID!). - Make sure
that the offsets before the one committed in the first run are not
reprocessed in the second run.
We have unit tests that make sure that non-empty groups will not be
converted. This should be enough.
Reviewers: Bill Bejeck <bbejeck@apache.org>
Minor PR to update name of maxInFlightMessages to maxInFlightRecords to
maintain consistency in share partition related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This field was used for replica_id, but after
51c833e795,
the OffsetsForLeaderEpochRequest directly relies on the internal structs
generated by the automated protocol. Therefore, we can safely remove it.
Reviewers: Lan Ding <isDing_L@163.com>, TengYao Chi
<frankvicky@apache.org>
`shouldCallOldImplementationExceptionHandler` should be a test case, but
somehow misses the `@Test` tag
Reviewers: Ken Huang <s7133700@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Modules target: runtime/src/main
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
There is a small logic bug in topic replay. If a topic is created and
then removed before the TopicsDelta is applied, we end up with the
deleted topic in createdTopics on the delta. Tis issue is fixed by
removing the topicName from createTopics when replaying
RemoveTopicRecord to make sure the deleted topics won't appear in
createTopics.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu
<kevin.wu2412@gmail.com>, Alyssa Huang <ahuang@confluent.io>
What I observed is that if I run both combinations useNewProtocol=true,
useNewProtocol=false it would often fail the second time, but if I only
run the second variation useNewProtocol=false it works, and only the
first variation useNewProtocol=true also works. So this points to some
state that is not cleared between the tests - and indeed, the test
creates a topic “inputTopic”, produces to it, but doesn’t delete it, so
the second variation will run with produce to it again and then run with
twice the data.
I also reduced heartbeat interval and session timeout since some of the
tests need to wait for the old consumer to leave which (sigh) Kafka
Streams doesn't do, so we have to wait that it gets kicked out by
session timeout. So previously we waited for 45 seconds, now, we at
least wait only 1 second.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
As described in
[jira](https://issues.apache.org/jira/browse/KAFKA-19625), this PR
implements replace `consumer.config` and `producer.config` with
`command-config` for kafka-verifiable-producer.sh and
kafka-verifiable-consumer.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>
As per the suggestion by @adixitconfluent and @chirag-wadhwa5,
[here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
I have refactored the code with variable and method names.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa
<cwadhwa@confluent.io>
This is the first part of the implementation of
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.
Added unit tests for the new timestamp.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This PR aims to add documentation to `alterLogLevelConfigs` method to
remind users to use valid LogLevelConfig constants.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR removes associated logging within NetworkClient to reduce noise
and streamline the client code.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur
<mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:
```
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
lastAcquiredOffset = lastOffset;
}
```
While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
<adixit@confluent.io>, Chirag Wadhwa <cwadhwa@confluent.io>
This PR ensures that describeTopics correctly propagates its timeoutMs
setting to the underlying describeCluster call. Integration tests were
added to verify that the API now fails with a TimeoutException when
brokers do not respond within the configured timeout.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
`testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe` does not align with the test case. This patch renames the test name to describe the test case more precisely.
Reviewers: TengYao Chi <frankvicky@apache.org>
This PR applies the same partition leader check for `StreamsGroupCommand` as
`ShareGroupCommand` and `ConsumerGroupCommand` to avoid the command
execution timeout.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Sean Quah
<squah@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>
This is followup PR for https://github.com/apache/kafka/pull/19699.
* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Justine Olshan
<jolshan@confluent.io>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
Update the KRaft dynamic voter set documentation. In Kafka 4.1, we
introduced a powerful new feature that enables seamless migration from a
static voter set to a dynamic voter set.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch adds the include argument to ConsumerPerformance tool.
ConsoleConsumer and ConsumerPerformance serve different purposes but
share common functionality for message consumption. Currently, there's
an inconsistency in their command-line interfaces:
- ConsoleConsumer supports an --include argument that allows users to
specify a regular expression pattern to filter topics for consumption
- ConsumerPerformance lacks this topic filtering capability, requiring
users to specify a single topic explicitly via --topic argument
This inconsistency creates two problems:
- Similar tools should provide similar topic selection capabilities for
better user experience
- Users cannot test consumer performance across multiple topics or
dynamically matching topic sets, making it difficult to test realistic
scenarios
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>