Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Modules target: runtime/src/main
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
There is a small logic bug in topic replay. If a topic is created and
then removed before the TopicsDelta is applied, we end up with the
deleted topic in createdTopics on the delta. Tis issue is fixed by
removing the topicName from createTopics when replaying
RemoveTopicRecord to make sure the deleted topics won't appear in
createTopics.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu
<kevin.wu2412@gmail.com>, Alyssa Huang <ahuang@confluent.io>
What I observed is that if I run both combinations useNewProtocol=true,
useNewProtocol=false it would often fail the second time, but if I only
run the second variation useNewProtocol=false it works, and only the
first variation useNewProtocol=true also works. So this points to some
state that is not cleared between the tests - and indeed, the test
creates a topic “inputTopic”, produces to it, but doesn’t delete it, so
the second variation will run with produce to it again and then run with
twice the data.
I also reduced heartbeat interval and session timeout since some of the
tests need to wait for the old consumer to leave which (sigh) Kafka
Streams doesn't do, so we have to wait that it gets kicked out by
session timeout. So previously we waited for 45 seconds, now, we at
least wait only 1 second.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
As described in
[jira](https://issues.apache.org/jira/browse/KAFKA-19625), this PR
implements replace `consumer.config` and `producer.config` with
`command-config` for kafka-verifiable-producer.sh and
kafka-verifiable-consumer.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>
As per the suggestion by @adixitconfluent and @chirag-wadhwa5,
[here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
I have refactored the code with variable and method names.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa
<cwadhwa@confluent.io>
This is the first part of the implementation of
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.
Added unit tests for the new timestamp.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This PR aims to add documentation to `alterLogLevelConfigs` method to
remind users to use valid LogLevelConfig constants.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR removes associated logging within NetworkClient to reduce noise
and streamline the client code.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur
<mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:
```
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
lastAcquiredOffset = lastOffset;
}
```
While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
<adixit@confluent.io>, Chirag Wadhwa <cwadhwa@confluent.io>
This PR ensures that describeTopics correctly propagates its timeoutMs
setting to the underlying describeCluster call. Integration tests were
added to verify that the API now fails with a TimeoutException when
brokers do not respond within the configured timeout.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
`testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe` does not align with the test case. This patch renames the test name to describe the test case more precisely.
Reviewers: TengYao Chi <frankvicky@apache.org>
This PR applies the same partition leader check for `StreamsGroupCommand` as
`ShareGroupCommand` and `ConsumerGroupCommand` to avoid the command
execution timeout.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Sean Quah
<squah@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>
This is followup PR for https://github.com/apache/kafka/pull/19699.
* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Justine Olshan
<jolshan@confluent.io>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
Update the KRaft dynamic voter set documentation. In Kafka 4.1, we
introduced a powerful new feature that enables seamless migration from a
static voter set to a dynamic voter set.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch adds the include argument to ConsumerPerformance tool.
ConsoleConsumer and ConsumerPerformance serve different purposes but
share common functionality for message consumption. Currently, there's
an inconsistency in their command-line interfaces:
- ConsoleConsumer supports an --include argument that allows users to
specify a regular expression pattern to filter topics for consumption
- ConsumerPerformance lacks this topic filtering capability, requiring
users to specify a single topic explicitly via --topic argument
This inconsistency creates two problems:
- Similar tools should provide similar topic selection capabilities for
better user experience
- Users cannot test consumer performance across multiple topics or
dynamically matching topic sets, making it difficult to test realistic
scenarios
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
**Changes**: Use ClusterTest to rewrite
EligibleLeaderReplicasIntegrationTest.
**Validation**: Run the test 50 times locally with consistent success.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
issue: https://github.com/apache/kafka/pull/19905#discussion_r2282202312
What: Change `String[] topics` to `Set<String> topics` throughout
`LogCompactionTester`. Why: `Set<String>` is more modern and reduces the
need for array-to-collection conversions.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Lan Ding
<isDing_L@163.com>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
The ReassignPartitionsCommand shows the topic replicas on each broker.
When using the --generate command, it returns the current partition
replica assignment. However, the log directory for each current replica
is always shown as any. This makes it impossible for users to determine
which specific log directory is being used by each replica. Therefore,
we should fix this behavior.
```
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "test1",
"partition": 0,
"replicas": [
4,
2
],
"log_dirs": [
"any",
"any"
]
}
]
}
```
This PR
```
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "test1",
"partition": 0,
"replicas": [
4,
2
],
"log_dirs": [
"/tmp/kraft-broker-logs234",
"/tmp/kraft-broker-logs"
]
}
]
}
```
Reviewers: PoAn Yang <payang@apache.org>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-18699
This PR aims at cleaning up the `metadata` module further by getting rid
of some extra code which can be replaced by record
Reviewers: Ken Huang <s7133700@gmail.com>, Ming-Yen Chung
<mingyen066@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The test uses regular consumer to commit offsets. New protocol requires
a streams consumer since we are using streams groups, otherwise we run
into group ID conflicts.
Followed the addition of the KafkaAdmin interface for setting offsets, a
Kafka Admin client is created and used the interface in to set the
committed offsets instead of instantiating a consumer.
Also enable all tests for stream new protocol.
Reviewers: Alieh Saeedi<asaeedi@confluent.io>, Kirk True
<ktrue@confluent.io>, Matthias Sax <mjsax@apache.org>, Bill Bejeck
<bbejeck@apache.org>
As per the current implementation in archiveRecords, when LSO is
updated, if we have multiple record batches before the new LSO, then
only the first one gets archived. This is because of the following lines
of code ->
`isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1,
initialState);`
`isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch, initialState);`
The first record / batch will make `isAnyOffsetArchived` /
`isAnyBatchArchived` true, after which this line of code will
short-circuit and the methods `archivePerOffsetBatchRecords` /
`archiveCompleteBatch` will not be called again. This PR changes the
order of the expressions so that the short-circuit does not prevent from
archiving all the required batches.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
The `record-size` and `throughput` arguments don’t work in
`TestRaftServer`. The `recordsPerSec` and `recordSize` values are always
hard-coded.
- Fix `recordsPerSec` and `recordSize` values hard-coded issue
- Add "Required" description to command-line options to make it clear to
users.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR aims at cleaning up the `jmh-benchmarks` module further by
getting rid of some extra code which can be replaced by record
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
During broker restarts, the topic-based RemoteLogMetadataManager (RLMM)
constructs the state by reading the internal `__remote_log_metadata`
topic. When the partition is not ready to perform remote storage
operations, then ReplicaNotAvailableException thrown back to the
consumer. The clients retries the request immediately.
This results in a lot of FETCH requests on the broker and utilizes the
request handler threads. Using the CountdownLatch to reduce the
frequency of ReplicaNotAvailableException thrown back to the clients.
This will improve the request handler thread usage on the broker.
Previously for one consumer, when RLMM is not ready for a partition,
then ~9K FetchConsumer requests / sec are received on the broker. With
this patch, the number of FETCH requests reduced by 95% to 600 / sec.
Reviewers: Lan Ding <isDing_L@163.com>, Satish Duggana
<satishd@apache.org>
Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Modules target: runtime/src/test
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
During shutdown, when the RSM closes first, then the ongoing requests
might throw an error. To handle the ongoing requests gracefully, closing
the RSM after closing the remote-log reader thread pools.
Reviewers: Satish Duggana <satishd@apache.org>
This PR aims at cleaning up the tools module further by getting rid of
some extra code which can be replaced by `record`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Fix typo and docs in following.
```
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
clients/src/main/resources/common/message/FetchRequest.json
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
```
Reviewers: Kuan-Po Tseng <brandboat@gmail.com>, Lan Ding
<isDing_L@163.com>, Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>
This PR fixes a problem related to `TestLinearWriteSpeed`. During my
work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed`
do not account for compression algorithms. It always uses
`Compression.NONE` when creating records. The problem was introduced in
this PR [1].
[1] - https://github.com/apache/kafka/pull/17736
Reviewers: Ken Huang <s7133700@gmail.com>, Mickael Maison
<mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Follow-up to
[KAFKA-18486](https://issues.apache.org/jira/browse/KAFKA-18486)
* Replace PartitionState with PartitionRegistration in
makeFollower/makeLeader
* Remove PartitionState.java since it is no longer referenced
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
Fixes a false positive in `BrokerRegistrationRequestTest` caused by
`isMigratingZkBroker`, and migrates the test from Scala to Java.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1. Move TransactionMetadata to transaction-coordinator module.
2. Rewrite TransactionMetadata in Java.
3. The `topicPartitions` field uses `HashSet` instead of `Set`, because
it's mutable field.
4. In Scala, when calling `prepare*` methods, they can use current value
as default input in `prepareTransitionTo`. However, in Java, it doesn't
support function default input value. To avoid a lot of duplicated code
or assign value to wrong field, we add a private class `TransitionData`.
It can get current `TransactionMetadata` value as default value and
`prepare*` methods just need to assign updated value.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
CoordinatorMetricsShard was split into a separate module in
(https://github.com/apache/kafka/pull/16883), causing the link in the
javadoc to become invalid.
So, remove broken link in CoordinatorMetricsShard javadoc.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Sanskar Jhajharia
<sjhajharia@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>