Rename org.apache.kafka.server:type=AssignmentsManager and
org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics
for the consist, these metrics should be
- `kafka.log.remote:type=...`
- `kafka.server:type=...`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Some sections are not very clear, and we need to update the
documentation.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
We are seeing cases where a Kafka Streams (KS) thread stalls for ~20
seconds. During this stall, the broker correctly aborts the open
transaction (triggered by the 10-second transaction timeout). However,
when the KS thread resumes, instead of receiving the expected
InvalidProducerEpochException (which we already handle gracefully as
part of transaction abort), the client is instead hit with an
InvalidTxnStateException. KS currently treats this as a fatal error,
causing the application to fail.
To fix this, we've added an epoch check before the verification check to
send the recoverable InvalidProducerEpochException instead of the fatal
InvalidTxnStateException. This helps safeguard both tv1 and tv2 clients
Reviewers: Justine Olshan <jolshan@confluent.io>
This MR should be couple of race conditions in RemoteIndexCacheTest.
1. There was a race condition between cache-cleanup-thread and test
thread, which wants to check that cache is gone. This was fixed with
TestUtils#waitForCondition
2. After each test we check that there is not thread leak. This check
wasn't working properly, because live of thread status is set by JVM
level, we can only set interrupted status (using private native void
interrupt0(); method under the hood), but we don't really know when JVM
will change the live status of thread. To fix this I've refactored
TestUtils#assertNoLeakedThreadsWithNameAndDaemonStatus method to use
TestUtils#waitForCondition. This fix should also affect few other tests,
which were flaky because of this check. See gradle run on
[develocity](https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.storage.internals.log.RemoteIndexCacheTest&tests.sortField=FLAKY)
After fix test were run 10000 times with repeated test annotation:
`./gradlew clean storage:test --tests
org.apache.kafka.storage.internals.log.RemoteIndexCacheTest.testCacheEntryIsDeletedOnRemoval`
... `Gradle Test Run :storage:test > Gradle Test Executor 20 >
RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition
9998 of 10000 PASSED` `Gradle Test Run :storage:test > Gradle Test
Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval()
> repetition 9999 of 10000 PASSED` `Gradle Test Run :storage:test >
Gradle Test Executor 20 > RemoteIndexCacheTest >
testCacheEntryIsDeletedOnRemoval() > repetition 10000 of 10000 PASSED`
`BUILD SUCCESSFUL in 20m 9s` `148 actionable tasks: 148 executed`
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
- **Reasons:** In this case, the `exit(int statusCode)` method invokes
`exit(statusCode, null)`, which means
the `message` argument is always `null` in this code path. As a result,
assigning
`exitMessage` has no effect and can be safely removed.
- **Changes:** Remove a redundant field assignment.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
We add the three main changes in this PR
- Disallowing null values for most LIST-type configurations makes sense,
since users cannot explicitly set a configuration to null in a
properties file. Therefore, only configurations with a default value of
null should be allowed to accept null.
- Disallowing duplicate values is reasonable, as there are currently no
known configurations in Kafka that require specifying the same value
multiple times. Allowing duplicates is both rare in practice and
potentially confusing to users.
- Disallowing empty list, even though many configurations currently
accept them. In practice, setting an empty list for several of these
configurations can lead to server startup failures or unexpected
behavior. Therefore, enforcing non-empty lists helps prevent
misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this
trade-off is justified by the significant improvements in safety,
consistency, and overall user experience.
Additionally, we introduce two minor adjustments:
- Reclassify some STRING-type configurations as LIST-type, particularly
those using comma-separated values to represent multiple entries. This
change reflects the actual semantics used in Kafka.
- Update the default values for some configurations to better align with
other configs.
These changes will not introduce any compatibility issues.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This is the first part of the implementation of
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.
Added unit tests for the new timestamp.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
During broker restarts, the topic-based RemoteLogMetadataManager (RLMM)
constructs the state by reading the internal `__remote_log_metadata`
topic. When the partition is not ready to perform remote storage
operations, then ReplicaNotAvailableException thrown back to the
consumer. The clients retries the request immediately.
This results in a lot of FETCH requests on the broker and utilizes the
request handler threads. Using the CountdownLatch to reduce the
frequency of ReplicaNotAvailableException thrown back to the clients.
This will improve the request handler thread usage on the broker.
Previously for one consumer, when RLMM is not ready for a partition,
then ~9K FetchConsumer requests / sec are received on the broker. With
this patch, the number of FETCH requests reduced by 95% to 600 / sec.
Reviewers: Lan Ding <isDing_L@163.com>, Satish Duggana
<satishd@apache.org>
During shutdown, when the RSM closes first, then the ongoing requests
might throw an error. To handle the ongoing requests gracefully, closing
the RSM after closing the remote-log reader thread pools.
Reviewers: Satish Duggana <satishd@apache.org>
Cleanups including:
- Rewrite `FetchCountAndOp` as a record class
- Replace `Tuple` by `Map.Entry`
Reviewers: TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
When remote storage is offline, then the segmentLag and bytesLag metrics
are not recorded. These metrics are useful to know the pending data to
upload when remote storage is down.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Kamal Chandraprakash
<kamal.chandraprakash@gmail.com>
Fixes a bug with tiered storage quota metrics introduced in [KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas).
The metrics tracking how much time have been spent in a throttled state
can stop reporting if a cluster stops stops doing remote copy/fetch and
the sensors go inactive.
This change delegates the job of refreshing inactive sensors to
SensorAccess. There's pretty similar logic in RLMQuotaManager which is
actually responsible for tracking and enforcing quotas and also uses a
Sensor object.
```
remote-fetch-throttle-time-avg
remote-copy-throttle-time-avg
remote-fetch-throttle-time-max
remote-copy-throttle-time-max
```
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
We found that one broker's local segment on disk never get removed
forever no matter how long it stored. The disk always keep increasing.

note: Partition 2's node is the exception node.
After we trouble shooting. we find if one broker is very slow to startup
it will cause the
TopicBasedRemoteLogMetadataManager#initializeResources's fail sometime
(it meet expectation due to the server is not ready as fast). Thus it
won't stop the server so that the server still run just with some
exception log but not shutdown. It won't upload to remote for the local
so that the local segment never to deleted.
So propose the change to shutdown the broker to avoid the silence
critical error which caused the disk keep increasing forever.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke
Chen <showuon@gmail.com>
Optimize `RemoteLogManager#buildFilteredLeaderEpochMap` .
Add a temporary unit test `testBuildFilteredLeaderEpochMapModify` in
`RemoteLogManagerTest` to verify the output consistency of the method
before and after optimization.
Randomly generate leaderEpochs and iterate 100000 times for
verification.
```
@Test
public void testBuildFilteredLeaderEpochMapModify() {
int testIterations = 100000;
for (int i = 0; i < testIterations; i++) { TreeMap<Integer,
Long> leaderEpochToStartOffset =
generateRandomLeaderEpochAndStartOffset();
// before optimize NavigableMap<Integer, Long>
optimizeBefore =
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
// after optimize NavigableMap<Integer, Long>
optimizeAfter =
RemoteLogManager.buildFilteredLeaderEpochMap2(leaderEpochToStartOffset);
assertEquals(optimizeBefore, optimizeAfter); } }
private static TreeMap<Integer, Long>
generateRandomLeaderEpochAndStartOffset() { TreeMap<Integer,
Long> map = new TreeMap<>(); Random random = new Random();
int numEntries = random.nextInt(100000); long lastStartOffset =
0;
for (int i = 0; i < numEntries; i++) { // generate
a leader epoch int leaderEpoch = random.nextInt(100000);
long startOffset;
// generate a random start offset , or use the last start
offset if (i > 0 && random.nextDouble() < 0.2) {
startOffset = lastStartOffset; } else { startOffset =
Math.abs(random.nextLong()) % 100000; } lastStartOffset =
startOffset;
map.put(leaderEpoch, startOffset);
}
return map;
}
```
Command:
``` ./gradlew storage:test --tests RemoteLogManagerTest```
Result: All unit tests passed.
<img width="1258" height="424" alt="image"
src="https://github.com/user-attachments/assets/7d9fc3b5-3bbc-440f-b1cf-3a2a5f97557a"
/> <img width="411" height="66" alt="image"
src="https://github.com/user-attachments/assets/22a0b443-88e8-43d2-a3f2-51266935ed34"
/>
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
Improve the error handling while building the remote-log-auxiliary state
when a follower node with an empty disk begin to synchronise with the
leader. If the topic has remote storage enabled, then the
ReplicaFetcherThread attempt to build the remote-log-auxiliary state.
Note that the remote-log-auxiliary state gets invoked only when the
leader-log-start-offset is non-zero and leader-log-start-offset is not
equal to leader-local-log-start-offset.
When the LeaderAndISR request is received, then the
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
followed by the RemoteLogManager#onLeadershipChange call. As a result,
when ReplicaFetcherThread initiates the
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not
have been initialized at that time and throws retriable exception.
Introduced RetriableRemoteStorageException to gracefully handle the
error.
After the patch:
```
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1,
fetcherId=0] Could not build remote log auxiliary state for orange-1 due
to error: RemoteLogManager is not ready for partition: orange-1
(kafka.server.ReplicaFetcherThread)
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2,
fetcherId=0] Could not build remote log auxiliary state for orange-0 due
to error: RemoteLogManager is not ready for partition: orange-0
(kafka.server.ReplicaFetcherThread)
```
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
- startConsumerThread is always true so removed the variable.
- Replaced the repetitive lock handling logic with
`withReadLockAndEnsureInitialized` to reduce duplication and improve
readability.
- Consolidated the logic in `initializeResources` and. simplified method
arguments to better encapsulate configuration.
- Extracted common code and reduced the usage of global variables.
- Named the variables properly.
Tests:
- Existing UTs since this patch refactored the code.
Reviewers: PoAn Yang <payang@apache.org>
Changes: Rename `waitForTopic` to `waitTopicCreation` for better clarity
Reasons: To align with `waitTopicDeletion` Reference:
https://github.com/apache/kafka/pull/20108/files#r2221659660
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<frankvicky@apache.org>
The comment on the RemoteLogManager.getLeaderEpochEntries method has a
small error description,it should be start(inclusive)and end(exclusive).
Reviewers: Ken Huang <s7133700@gmail.com>, Lan Ding <isDing_L@163.com>,
Chia-Ping Tsai <chia7712@gmail.com>
This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to https://github.com/apache/kafka/pull/19961.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
**Problem Description**
In the `RemoteIndexCache.cleanup()` method, the asynchronous invocation
of `index.deleteIfExists()` may cause a conflict. When the
`getIndexFileFromRemoteCacheDir()` method is executed, it utilizes
`Files.walk()` to traverse all files in the directory path. If
`index.deleteIfExists()` is triggered during this traversal, a
`NoSuchFileException` will be thrown.
**Solution**
To resolve this issue, ensure that `index.deleteIfExists()` has been
fully executed before invoking `getIndexFileFromRemoteCacheDir()`.
Reviewers: Jun Rao <junrao@gmail.com>
This PR enables reading remote storage for multiple partitions in one
fetchRequest. The main changes are:
1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and
other metadata now.
2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done,
either succeeded or failed.
3. In `ReplicaManager#fetchMessage`, we'll create one
`DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and
watch all of them.
4. Added tests
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, Satish Duggana <satishd@apache.org>
- Fix testUncommittedDataNotConsumedFrequentSegmentRolls() and
testUncommittedDataNotConsumed(), which call createLog() but never close
the log when the tests complete.
- Move LogConcurrencyTest to the Storage module and rewrite it in Java.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
#5608 introduced a regression where the check for `targetOffset <
log.highWatermark`
to emit a `WARN` log was made incorrectly after truncating the log.
This change moves the check for `targetOffset < log.highWatermark` to
`UnifiedLog#truncateTo` and ensures we emit a `WARN` log on truncation
below the replica's HWM by both the `ReplicaFetcherThread` and
`ReplicaAlterLogDirsThread`
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-19390
The AbstractIndex.resize() method does not release the old memory map
for both index and time index files. In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.
The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.
The same problem was reported in
[KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.
Reviewers: Jun Rao <junrao@gmail.com>
When writing HTML, it's recommended to use the <code> element instead of
backticks for inline code formatting.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
* The CREATE_TOPIC request gets issued only when it is clear that the
topic does not exist in the cluster.
* When the request to describe the topic gets timed-out or any exception
thrown other than UnknownTopicOrPartitionException, then the same gets
re-thrown and the describe/create topic request gets retried in the next
iteration until the initializationRetryMaxTimeoutMs gets breached.
Fixes: https://issues.apache.org/jira/browse/KAFKA-19371
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash
<kamal.chandraprakash@gmail.com>
---------
Co-authored-by: stroller.fu <stroller.fu@zoom.us>
## Summary
- Fix potential race condition in
LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
non-monotonic offsets and causes replication to stop.
- See https://issues.apache.org/jira/browse/KAFKA-19407 for the details
how it happen.
Reviewers: Vincent PÉRICART <mauhiz@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Description
* Replace `org.apache.kafka.common.test.TestUtils` with
`org.apache.kafka.test.TestUtils` in outer package modules to
standardize test utility usage
* Move `waitUntilLeaderIsElectedOrChangedWithAdmin` method from
`org.apache.kafka.test.TestUtils` to `ClusterInstance` and refactor for
better code organization
* Add `org.apache.kafka.test.TestUtils` dependency to
`transaction-coordinator` import control
Reviewers: PoAn Yang [payang@apache.org](mailto:payang@apache.org), Ken
Huang [s7133700@gmail.com](mailto:s7133700@gmail.com), Ken Huang
[s7133700@gmail.com](mailto:s7133700@gmail.com), Chia-Ping Tsai
[chia7712@gmail.com](mailto:chia7712@gmail.com)
BTW: whether we should rename
`ReplicaManager#updateLeaderAndFollowerMetrics`
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <kitingiao@gmail.com>, Lan Ding
<isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
Deprecate the `remote.log.manager.thread.pool.size` configuration and introduce a new dynamic configuration:
`remote.log.manager.follower.thread.pool.size`.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>
Simplify Set initialization and reduce the overhead of creating extra
collections.
The changes mostly include:
- new HashSet<>(List.of(...))
- new HashSet<>(Arrays.asList(...)) / new HashSet<>(asList(...))
- new HashSet<>(Collections.singletonList()) / new
HashSet<>(singletonList())
- new HashSet<>(Collections.emptyList())
- new HashSet<>(Set.of())
This change takes the following into account, and we will not change to
Set.of in these scenarios:
- Require `mutability` (UnsupportedOperationException).
- Allow `duplicate` elements (IllegalArgumentException).
- Allow `null` elements (NullPointerException).
- Depend on `Ordering`. `Set.of` does not guarantee order, so it could
make tests flaky or break public interfaces.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Log segment closure results in right sizing the segment on disk along
with the associated index files.
This is specially important for TimeIndexes where a failure to right
size may eventually cause log roll failures leading to under replication
and log cleaner failures.
This change uses `Utils.closeAll` which propagates exceptions, resulting
in an "unclean" shutdown. That would then cause the broker to attempt to
recover the log segment and the index on next startup, thereby avoiding
the failures described above.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This change handles rejecting non-zero sequences when there is an empty
producerIDState with TV2. The scenario will be covered with the
re-triable OutOfOrderSequence error.
For Transactions V2 with empty state: ✅ Allow only sequence 0 is allowed for
new producers or after state cleanup (new validation added) ❌ Don't allow any
non-zero sequence is rejected with our specific error message ❌ Don't allow any epoch
bumps still require sequence 0 (existing validation remains)
For Transactions V1 with empty state: ✅ Allow ANY sequence number is allowed
(0, 5, 100, etc.) ❌ Don't allow epoch bumps still require sequence 0 (existing
validation)
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
With the transaction V2, replica manager checks whether the incoming
producer request produces to a partition belonging to a transaction.
ReplicaManager figures this out by checking the producer epoch stored in
the partition log. However, the current code does not reject the produce
request if its producer epoch is lower than the stored producer epoch.
It is an optimization to reject such requests earlier instead of sending
an AddPartitionToTxn request and getting rejected in the response.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
The main issue was that we forgot to set
`TopicConfig.SEGMENT_BYTES_CONFIG` to at least `1024 * 1024`, which
caused problems in tests with small segment sizes.
To address this, we introduced a new internal config:
`LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG`, allowing us to set smaller
segment bytes specifically for testing purposes.
We also updated the logic so that if a user configures the topic-level
segment bytes without explicitly setting the internal config, the
internal value will no longer be returned to the user.
In addition, we removed
`MetadataLogConfig#METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG` and added
three new internal configurations:
- `INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_DELETE_DELAY_MILLIS_CONFIG`
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR rewrites `RemoteTopicCrudTest` in Java using the `@ClusterTest`
framework and moves it to the `storage` module.
**Note:** Two test cases have not yet been migrated
- `testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic`
-
`testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled`
These tests rely on modifying broker configs during the test lifecycle,
which `ClusterTest` currently does not support. They will be migrated in
a follow-up PR after
[#16808](https://github.com/apache/kafka/pull/16808) is merged, which
introduces support for config updates in `ClusterTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Replaces the UNIFIED_LOG_UNKNOWN_OFFSET constant in LogOffsetMetadata
with UnifiedLog.UNKNOWN_OFFSET.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, YuChia Ma <minecraftmiku831@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
In FETCH requests and TXN_OFFSET_COMMIT requests, on current trunk we
run into a race condition inside UnifiedLog, causing a
`NoSuchElementException` in
`UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:651)`.
The cause is that the line a performing an `isPresent` check on a
volatile Optional before accessing it in `get`, leaving the door open to
a race condition when the optional changes between `isPresent` and
`get`. This change takes a copy of the volatile variable first.
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add a warning message when using log.cleaner.enable to remind users that
this configuration is deprecated. Also, add a warning message for
log.cleaner.threads=0 because in version 5.0, the value must be greater
than zero.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>