This commit updates CI to test against Java 24 instead of Java 23 which
is EOL.
Due to Spotbugs not having released version 4.9.4 yet, we can't run
Spotbugs on Java 24. Instead, we are choosing to run Spotbugs, and the
rest of the compile and validate build step, on Java 17 for now.
Once 4.9.4 has released, we will switch to using Java 24 for this.
Exclude spotbugs from the run-tests gradle action. Spotbugs is already
being run once in the build by "compile and validate", there is no
reason to run it again as part of executing tests.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Now that Kafka support Java 17, this PR makes some changes in tools
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Some minor changes to use the enhanced switch.
Sub modules targeted: tools/src/test
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The compiler warning is due to a lack of import. This patch imports the ApiException to fix it.
Reviewers: TengYao Chi <frankvicky@apache.org>, Yung
<yungyung7654321@gmail.com>
When using a connector that requires a schema, such as JDBC connectors,
with JSON messages, the current JSONConverter necessitates including the
schema within every message. To address this, we are introducing a new
parameter, schema.content, which allows you to provide the schema
externally. This approach not only reduces the size of the messages but
also facilitates the use of more complex schemas.
KIP :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter
Reviewers: Mickael Maison <mickael.maison@gmail.com>, TengYao Chi <frankvicky@apache.org>, Edoardo Comar <ECOMAR@uk.ibm.com>
The `state-change.log` file is being incorrectly rotated to
`stage-change.log.[date]`. This change fixes the typo to have the log
file correctly rotated to `state-change.log.[date]`
_No functional changes._
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Christo Lolov
<lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This flag also skips control records, so the description needs to be
updated.
---------
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Vincent Potucek
Along with the change: https://github.com/apache/kafka/pull/17952
([KIP-966](https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas)),
the semantics of `min.insync.replicas` config has small change, and add
some constraints. We should document them clearly.
Reviewers: Jun Rao <junrao@gmail.com>, Calvin Liu <caliu@confluent.io>,
Mickael Maison <mickael.maison@gmail.com>, Paolo Patierno
<ppatierno@live.com>, Federico Valeri <fedevaleri@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
OffsetFetchResponses can have three different error structures depending
on the version. Version 2 adds a top level error code for group-level
errors. Version 8 adds support for querying multiple groups at a time
and nests the fields within a groups array. Add a test for the
errorCounts implementation since it varies depending on the version.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
## Changes:
- Replaced all references to boundPort with brokerBoundPort.
## Reasons
- boundPort and brokerBoundPort share the same definition and behavior.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
This patch updates the code and the dependency with the latest namespace
and version.
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Fixup PR Labels / fixup-pr-labels (needs-attention) (push) Has been cancelledDetails
Fixup PR Labels / fixup-pr-labels (triage) (push) Has been cancelledDetails
Docker Image CVE Scanner / scan_jvm (3.7.2) (push) Has been cancelledDetails
Docker Image CVE Scanner / scan_jvm (3.8.1) (push) Has been cancelledDetails
Docker Image CVE Scanner / scan_jvm (3.9.1) (push) Has been cancelledDetails
Docker Image CVE Scanner / scan_jvm (4.0.0) (push) Has been cancelledDetails
Docker Image CVE Scanner / scan_jvm (latest) (push) Has been cancelledDetails
Flaky Test Report / Flaky Test Report (push) Has been cancelledDetails
Fixup PR Labels / needs-attention (push) Has been cancelledDetails
The timeout value may be overflowed if users set a large expiration
time.
```
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 *
sessionLifetimeMs;
```
Fixed it by throwing exception if the value is overflowed.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Luke Chen <showuon@gmail.com>,
TengYao Chi <frankvicky@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
The PR fixes following:
1. In case share partition arrive at a state which should be treated as
final state
of that batch/offset (example - LSO movement which causes offset/batch
to be ARCHIVED permanently), the result of pending write state RPCs for
that offset/batch override the ARCHIVED state. Hence track such updates
and apply when transition is completed.
2. If an acquisition lock timeout occurs while an offset/batch is
undergoing transition followed by write state RPC failure, then
respective batch/offset can
land in a scenario where the offset stays in ACQUIRED state with no
acquisition lock timeout task.
3. If a timer task is cancelled, but due to concurrent execution of
timer task and acknowledgement, there can be a scenario when timer task
has processed post cancellation. Hence it can mark the offset/batch
re-avaialble despite already acknowledged.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
<adixit@confluent.io>
The code had a mixture of "acknowledgement type" and "acknowledge type".
The latter is preferred.
Reviewers: TengYao Chi <frankvicky@apache.org>, Lan Ding
<isDing_L@163.com>
*What*
Currently when we received a top level error response in ShareFetch, we
would log the error, update the share session epoch and proceed to the
next request.
But these acknowledgements(if any) are not completed and the callback
would not have been processed.
PR aims to address this by completing these acknowledgements with the
error code from the response in this case.
Reviewers: Andrew Schofield <aschofield@confluent.io>
https://issues.apache.org/jira/browse/KAFKA-19561
Addresses a race condition during SASL reauthentication where the
server-side `KafkaChannel.send()` queues a response, but OP_WRITE is
removed before the channel becomes writable — resulting in stuck
responses and client timeouts.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
See: https://github.com/apache/kafka/pull/20168#discussion_r2227310093
add follow test case:
Given a topic with three partitions, where partition `t-2` is offline,
if partitionsToReset contains only `t-1`, the method
filterNoneLeaderPartitions incorrectly includes `t-2` in the result,
leading to a failure in the tool.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Ken Huang <s7133700@gmail.com>, Andrew
Schofield <aschofield@confluent.io>
Cleanups including:
- Rewrite `FetchCountAndOp` as a record class
- Replace `Tuple` by `Map.Entry`
Reviewers: TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
### Problem
The connect-plugin-path tool crashes with
`UnsupportedOperationException` when processing plugins that have
loadable classes but no ServiceLoader manifest files.
### Root Cause
Line 326 attempts to remove from an immutable `Collections.emptySet()`:
```java
nonLoadableManifests.getOrDefault(pluginDesc.className(),
Collections.emptySet()).remove(pluginDesc.type());
```
### Solution
Replace `Collections.emptySet()` with `new HashSet<>()` to provide a
mutable set.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
- Use `record` where possible
- Use enhanced `switch`
- Tweak a bunch of assertions
Reviewers: Yung <yungyung7654321@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Dongnuo Lyu
<dlyu@confluent.io>, PoAn Yang <payang@apache.org>
All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:
1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.
This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.
Reviewers: Matthias J. Sax <matthias@confluent.io>
JIRA: [KAFKA-19563](https://issues.apache.org/jira/browse/KAFKA-19563)
Improve the add-controller doc to notify users they have to add the
configs to be passed to admin client into the controller.properties
before the improvement is done.
```
--command-config COMMAND_CONFIG
Property file containing configs to be passed to Admin Client.
For add-controller, the file is used to specify the controller
properties as well.
```
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR adds a check to the storage tool's format command which throws a
TerseFailure when the controller.quorum.voters config is defined and the
node is formatted with the --standalone flag or the
--initial-controllers flag.
Without this check, it is possible to have two voter sets. For example,
in a three node setup, the two nodes that formatted with
--no-initial-controllers could form quorum with each other since they
have the static voter set, and the --standalone node would ignore the
config and read the voter set of itself from its log, forming its own
quorum of 1.
Reviewers: José Armando García Sancio <jsancio@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Alyssa Huang <ahuang@confluent.io>
`tests/kafkatest/services/console_consumer.py` cannot be run directly.
The correct way is to run
`tests/kafkatest/sanity_checks/test_console_consumer.py`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Clarifies that the fields `LogDirDescription#totalBytes`,
`LogDirDescription#usableBytes`, and `ReplicaInfo#size` do not include
the size of remote storage by updating their corresponding docs.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
When remote storage is offline, then the segmentLag and bytesLag metrics
are not recorded. These metrics are useful to know the pending data to
upload when remote storage is down.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Kamal Chandraprakash
<kamal.chandraprakash@gmail.com>
Fixes a bug with tiered storage quota metrics introduced in [KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas).
The metrics tracking how much time have been spent in a throttled state
can stop reporting if a cluster stops stops doing remote copy/fetch and
the sensors go inactive.
This change delegates the job of refreshing inactive sensors to
SensorAccess. There's pretty similar logic in RLMQuotaManager which is
actually responsible for tracking and enforcing quotas and also uses a
Sensor object.
```
remote-fetch-throttle-time-avg
remote-copy-throttle-time-avg
remote-fetch-throttle-time-max
remote-copy-throttle-time-max
```
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Follow up on https://github.com/apache/kafka/pull/20241.
Delete the instruction that manually set `streams.version=1` for running
Kafka 4.1 since it is already achieved in previous setup steps.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
[KAFKA-16717](https://issues.apache.org/jira/browse/KAFKA-16717) aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.
Reviewers: Lan Ding <isDing_L@163.com>, Chia-Ping Tsai
<chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
*What*
https://issues.apache.org/jira/browse/KAFKA-19559
- There are a few sensors(created when a `ShareConsumer` initializes)
which are not removed when the `ShareConsumer` closes.
- To maintain consistency and prevent any leaks, we have to remove all
the sensors when the consumer is closed.
This is similar to this issue for regular consumers -
https://issues.apache.org/jira/browse/KAFKA-19542
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
<apoorvmittal10@gmail.com>
The PR restricts the records being acquired post max-inflight limit.
Previously the max in-flight limit was only enforced while considering
the share partition for further fetches i.e. once the limit was reached
the share partition was not considered for further fetches. However,
when the records are actively released then there might be some records
being acquired post max-inflight limit. This is evident with higher
number of consumers reading from same share partition and releasing the
records.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lan Ding
<isDing_L@163.com>
This PR fixes an issue where the thread name shown in log messages did
not match the actual execution context. Previously, log entries
displayed the context of the newly created thread, while the logger
reflected the current executing thread. This mismatch led to confusion
and made log tracing more difficult.
Changes:
- Use logger without context to not have context
- Updated log messages to explicitly describe the thread being created
- Fixed instances where the log context reflected the current thread
instead of the newly created one
Reviewers: Matthias J. Sax <matthias@confluent.io>
Implements KIP-1195.
BrokerNotFoundException exception is unused since 2.8 Marking it
deprecated so that it can be removed in next major release.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Observers may get stuck fetching from bootstrap servers even on
discovery of a leader from a fetch response.
Observers currently fetch from bootstrap servers once their fetch
timeout expires, and even on rediscovery of the leader (e.g. observer
receives fetch response indicating location of leader), observers will
not attempt to resume fetching from the leader.
This change simplifies observer polling logic to just rely on
maybeSendFetchToBestNode which takes care of the logic of selecting
either the leader or bootstrap servers as the destination based on
timeouts and backoffs.
Reviewers: José Armando García Sancio <jsancio@apache.org>
We found that one broker's local segment on disk never get removed
forever no matter how long it stored. The disk always keep increasing.

note: Partition 2's node is the exception node.
After we trouble shooting. we find if one broker is very slow to startup
it will cause the
TopicBasedRemoteLogMetadataManager#initializeResources's fail sometime
(it meet expectation due to the server is not ready as fast). Thus it
won't stop the server so that the server still run just with some
exception log but not shutdown. It won't upload to remote for the local
so that the local segment never to deleted.
So propose the change to shutdown the broker to avoid the silence
critical error which caused the disk keep increasing forever.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke
Chen <showuon@gmail.com>
This patch mainly includes two improvements:
1. Update currentFetch when `pollForFetches()` throws an exception.
2. Add an override `KafkaShareConsumer.acknowledge(String topic, int
partition, long offset, AcknowledgeType type)` .
Reviewers: Andrew Schofield <aschofield@confluent.io>
This is a small typo I noticed while working on Connect
Original warning message
> level=WARN connector_context=The worker has been configured with one
or more internal converter properties ([internal.key.converter,
schemas.enable, internal.value.converter, schemas.enable]). Support for
these properties was deprecated in version 2.0 and removed in version
3.0, and specifying them will have no effect. Instead, an instance of
the JsonConverter with schemas.enable set to false will be used. For
more information, please visit
https://kafka.apache.org/documentation/#upgrade and consult the upgrade
notesfor the 3.0 release.
class=org.apache.kafka.connect.runtime.WorkerConfig line=310
Reviewers: Ken Huang <s7133700@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
With 4.0 release, we remove pageview demo because it depends on
`:connect:json` which requires JDK 17. This PR removes the connect
dependency and adds a customized serializer and deserializer, to make
pageview demo works with JDK 11.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Optimize `RemoteLogManager#buildFilteredLeaderEpochMap` .
Add a temporary unit test `testBuildFilteredLeaderEpochMapModify` in
`RemoteLogManagerTest` to verify the output consistency of the method
before and after optimization.
Randomly generate leaderEpochs and iterate 100000 times for
verification.
```
@Test
public void testBuildFilteredLeaderEpochMapModify() {
int testIterations = 100000;
for (int i = 0; i < testIterations; i++) { TreeMap<Integer,
Long> leaderEpochToStartOffset =
generateRandomLeaderEpochAndStartOffset();
// before optimize NavigableMap<Integer, Long>
optimizeBefore =
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
// after optimize NavigableMap<Integer, Long>
optimizeAfter =
RemoteLogManager.buildFilteredLeaderEpochMap2(leaderEpochToStartOffset);
assertEquals(optimizeBefore, optimizeAfter); } }
private static TreeMap<Integer, Long>
generateRandomLeaderEpochAndStartOffset() { TreeMap<Integer,
Long> map = new TreeMap<>(); Random random = new Random();
int numEntries = random.nextInt(100000); long lastStartOffset =
0;
for (int i = 0; i < numEntries; i++) { // generate
a leader epoch int leaderEpoch = random.nextInt(100000);
long startOffset;
// generate a random start offset , or use the last start
offset if (i > 0 && random.nextDouble() < 0.2) {
startOffset = lastStartOffset; } else { startOffset =
Math.abs(random.nextLong()) % 100000; } lastStartOffset =
startOffset;
map.put(leaderEpoch, startOffset);
}
return map;
}
```
Command:
``` ./gradlew storage:test --tests RemoteLogManagerTest```
Result: All unit tests passed.
<img width="1258" height="424" alt="image"
src="https://github.com/user-attachments/assets/7d9fc3b5-3bbc-440f-b1cf-3a2a5f97557a"
/> <img width="411" height="66" alt="image"
src="https://github.com/user-attachments/assets/22a0b443-88e8-43d2-a3f2-51266935ed34"
/>
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
Add the ackWhenCommitted boolean field to the AddRaftVoter request, and
bump the RPC's version to 1.
The default value of the ackWhenCommitted field is true, and in this
case the leader will return a response after committing the VotersRecord
generated by the RPC. If ackWhenCommitted is false, the leader will
return a response after generating the new VotersRecord and adding it to
the batch accumulator.
Reviewers: Alyssa Huang <ahuang@confluent.io>, José Armando García
Sancio <jsancio@apache.org>