## Summary
- Fix potential race condition in
LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
non-monotonic offsets and causes replication to stop.
- See https://issues.apache.org/jira/browse/KAFKA-19407 for the details
how it happen.
Reviewers: Vincent PÉRICART <mauhiz@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Log segment closure results in right sizing the segment on disk along
with the associated index files.
This is specially important for TimeIndexes where a failure to right
size may eventually cause log roll failures leading to under replication
and log cleaner failures.
This change uses `Utils.closeAll` which propagates exceptions, resulting
in an "unclean" shutdown. That would then cause the broker to attempt to
recover the log segment and the index on next startup, thereby avoiding
the failures described above.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This change handles rejecting non-zero sequences when there is an empty
producerIDState with TV2. The scenario will be covered with the
re-triable OutOfOrderSequence error.
For Transactions V2 with empty state: ✅ Allow only sequence 0 is allowed for
new producers or after state cleanup (new validation added) ❌ Don't allow any
non-zero sequence is rejected with our specific error message ❌ Don't allow any epoch
bumps still require sequence 0 (existing validation remains)
For Transactions V1 with empty state: ✅ Allow ANY sequence number is allowed
(0, 5, 100, etc.) ❌ Don't allow epoch bumps still require sequence 0 (existing
validation)
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
With the transaction V2, replica manager checks whether the incoming
producer request produces to a partition belonging to a transaction.
ReplicaManager figures this out by checking the producer epoch stored in
the partition log. However, the current code does not reject the produce
request if its producer epoch is lower than the stored producer epoch.
It is an optimization to reject such requests earlier instead of sending
an AddPartitionToTxn request and getting rejected in the response.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
The main issue was that we forgot to set
`TopicConfig.SEGMENT_BYTES_CONFIG` to at least `1024 * 1024`, which
caused problems in tests with small segment sizes.
To address this, we introduced a new internal config:
`LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG`, allowing us to set smaller
segment bytes specifically for testing purposes.
We also updated the logic so that if a user configures the topic-level
segment bytes without explicitly setting the internal config, the
internal value will no longer be returned to the user.
In addition, we removed
`MetadataLogConfig#METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG` and added
three new internal configurations:
- `INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG`
- `INTERNAL_DELETE_DELAY_MILLIS_CONFIG`
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR rewrites `RemoteTopicCrudTest` in Java using the `@ClusterTest`
framework and moves it to the `storage` module.
**Note:** Two test cases have not yet been migrated
- `testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic`
-
`testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled`
These tests rely on modifying broker configs during the test lifecycle,
which `ClusterTest` currently does not support. They will be migrated in
a follow-up PR after
[#16808](https://github.com/apache/kafka/pull/16808) is merged, which
introduces support for config updates in `ClusterTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Replaces the UNIFIED_LOG_UNKNOWN_OFFSET constant in LogOffsetMetadata
with UnifiedLog.UNKNOWN_OFFSET.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, YuChia Ma <minecraftmiku831@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
In FETCH requests and TXN_OFFSET_COMMIT requests, on current trunk we
run into a race condition inside UnifiedLog, causing a
`NoSuchElementException` in
`UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:651)`.
The cause is that the line a performing an `isPresent` check on a
volatile Optional before accessing it in `get`, leaving the door open to
a race condition when the optional changes between `isPresent` and
`get`. This change takes a copy of the volatile variable first.
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add a warning message when using log.cleaner.enable to remind users that
this configuration is deprecated. Also, add a warning message for
log.cleaner.threads=0 because in version 5.0, the value must be greater
than zero.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
We updated the validation rule for cleanup.policy in remote storage
mode.
If remote log storage is enabled, only cleanup.policy=delete is allowed.
Any other value (e.g. compact, compact,delete) will now result in a
config validation error.
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
The fallback mechanism for `remote.log.manager.copier.thread.pool.size`
and `remote.log.manager.expiration.thread.pool.size` defaulting to
`remote.log.manager.thread.pool.size` was introduced in KIP-950. This
approach was abandoned in KIP-1030, where default values were changed
from -1 to 10, and a configuration validator enforcing a minimum value
of 1 was added. As a result, this commit removes the fallback mechanism
from `RemoteLogManagerConfig.java` to align with the new defaults and
validation.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
[As discussed in the mailing
list](https://lists.apache.org/thread/m03mpkm93737kk6d1nd6fbv9wdgsrhv9),
the broker only fetches remote data for ONE partition in a given
FetchRequest. In other words, if a consumer sends a FetchRequest
requesting 50 topic-partitions, and each partition's requested offset is
not stored locally - the broker will fetch and respond with just one
partition's worth of data from the remote store, and the rest will be
empty.
Given our defaults for total fetch response is 50 MiB and per partition
is 1 MiB, this can limit throughput. This patch documents the behavior
in 3 configs - `fetch.max.bytes`, `max.partition.fetch.bytes` and
`remote.fetch.max.wait.ms`
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash
<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
Add a wait for cleaner thread shutdown in `testCleanerThreadShutdown` to
eliminate flakiness. After calling `cache.close()`, the test now uses
`TestUtils.waitForCondition` to poll until the background
“remote-log-index-cleaner” thread has fully exited before asserting that
no cleaner threads remain. This ensures the asynchronous shutdown always
completes before the final assertions.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The remote storage reader thread pool use same count for both maximum
and core size. If users adjust the pool size larger than original value,
it throws `IllegalArgumentException`. Updated both value to fix the
issue.
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
1. Remove `RemoteLogManager#startup` and
`RemoteLogManager#onEndpointCreated`
2. Move endpoint creation to `BrokerServer`
3. Move `RemoteLogMetadataManager#configure` and
`RemoteLogStorageManager#configure` to RemoteLogManager constructor
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
For segments that are uploaded to remote, RemoteIndexCache caches the
fetched offset, timestamp, and transaction index entries on the first
invocation to remote, then the subsequent invocations are accessed from
local.
The remote indexes that are cached locally gets removed on two cases:
1. Remote segments that are deleted due to breach by retention size/time
and start-offset.
2. The number of cached indexes exceed the remote-log-index-cache size
limit of 1 GB (default).
There are two layers of locks used in the RemoteIndexCache. First-layer
lock on the RemoteIndexCache and the second-layer lock on the
RemoteIndexCache#Entry.
**Issue**
1. The first-layer of lock coordinates the remote-log reader and deleter
threads. To ensure that the reader and deleter threads are not blocked
on each other, we only take `lock.readLock()` when accessing/deleting
the cached index entries.
2. The issue happens when both the reader and deleter threads took the
readLock, then the deleter thread marked the index as
`markedForCleanup`. Now, the reader thread which holds the `indexEntry`
gets an IllegalStateException when accessing it.
3. This is a concurrency issue, where we mark the entry as
`markedForCleanup` before removing it from the cache. See
RemoteIndexCache#remove, and RemoteIndexCache#removeAll methods.
4. When an entry gets evicted from cache due to breach by maxSize of 1
GB, then the cache remove that entry before calling the evictionListener
and all the operations are performed atomically by caffeine cache.
**Solution**
1. When the deleter thread marks an Entry for deletion, then we rename
the underlying index files with ".deleted" as suffix and add a job to
the remote-log-index-cleaner thread which perform the actual cleanup.
Previously, the indexes were not accessible once it was marked for
deletion. Now, we allow to access those renamed files (from entry that
is about to be removed and held by reader thread) until those relevant
files are removed from disk.
2. Similar to local-log index/segment deletion, once the files gets
renamed with ".deleted" as suffix then the actual deletion of file
happens after `file.delete.delay.ms` delay of 1 minute. The renamed
index files gets deleted after 30 seconds.
3. During this time, if the same index entry gets fetched again from
remote, then it does not have conflict with the deleted entry as the
file names are different.
Reviewers: Satish Duggana <satishd@apache.org>
Pretty much a straight forward move of these classes. I just updated
`RemoteLogManagerTest` to not use `KafkaConfig`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
We need add SuppressWarnings annotation, because `log.cleaner.enable`
mark deprecated.
Reviewers: PoAn Yang <payang@apache.org>, Kuan-Po Tseng
<brandboat@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
JIRA: KAFKA-13610 This patch deprecates the `log.cleaner.enable`
configuration. It's part of
[KIP-1148](https://cwiki.apache.org/confluence/x/XAyWF).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>
Move LogCleaner and related classes to storage module and rewrite in
Java.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Jun Rao <junrao@gmail.com>
jira: https://issues.apache.org/jira/browse/KAFKA-19076
the message is used when the function encounters error, so the error
message should be created lazy.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Given that now we support Java 17 on our brokers, this PR replace the
use of the following in storage module:
- Collections.singletonList() and Collections.emptyList() with List.of()
- Collections.singletonMap() and Collections.emptyMap() with Map.of()
- Collections.singleton() and Collections.emptySet() with Set.of()
- Arrays.asList() with List.of()
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
There are two change for this PR.
1. Move `BrokerCompressionTest ` from core to storage
2. Rewrite `BrokerCompressionTest ` from scala to java
Reviewers: TengYao Chi <kitingiao@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The zookeeper mode was removed in 4.0. The test cases don't need to
specify quorum. Following variable and functions can be replaced:
- TestWithParameterizedQuorumAndGroupProtocolNames
- getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersAll
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Move LogCleanerManager and related classes to storage module and rewrite
in Java.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
Reverts commit
2723dbf3a0
and
269e8892ad.
Instead of reopening the transaction index, it cancels the
RemoteFetchTask without interrupting it--avoiding to close the
TransactionIndex channel.
This will lead to complete the execution of the remote fetch but ignoring
the results. Given that this is considered a rare case, we could live
with this. If it becomes a performance issue, it could be optimized.
Reviewers: Jun Rao <junrao@gmail.com>
The `lastOffset` includes the entire batch header, so we should check `baseOffset` instead.
To optimize this, we need to update the search logic. The previous
approach simply checked whether each batch's `lastOffset()` was greater
than or equal to the target offset. Once it found the first batch that
met this condition, it returned that batch immediately.
Now that we are using `baseOffset()`, we need to handle a special case:
if the `targetOffset` falls between the `lastOffset` of the previous
batch and the `baseOffset` of the matching batch, we should select the
matching batch. The updated logic is structured as follows:
1. First, if baseOffset exactly equals targetOffset, return immediately.
2. If we find the first batch with baseOffset greater than targetOffset
- Check if the previous batch contains the target
- If there's no previous batch, return the current batch or the previous
batch doesn't contain the target, return the current batch
5. After iterating through all batches, check if the last batch contains
the target offset.
This code path is not thread-safe, so we need to prevent `EOFException`.
To avoid this exception, I am still using an early return. In this
scenario, `lastOffset` is still used within the loop, but it should be
executed at most once within the loop.
Therefore, in the new implementation, `lastOffset` will be executed at
most once. In most cases, this results in an optimization.
Test: Verifying Memory Usage Improvement
To evaluate whether this optimization helps, I followed the steps below
to monitor memory usage:
1. Start a Standalone Kafka Server
```sh
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties
```
2. Use Performance Console Tools to Produce and Consume Records
**Produce Records:**
```sh
./kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000000 \
--record-size 100 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
```
**Consume Records:**
```sh
./bin/kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000000 \
--bootstrap-server localhost:9092
```
It can be observed that memory usage has significantly decreased.
trunk:

this PR:

Reviewers: Kirk True <kirk@kirktrue.pro>, TengYao Chi
<kitingiao@gmail.com>, David Arthur <mumrah@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Mark the following tests as flaky:
* StickyAssignorTest > testLargeAssignmentAndGroupWithUniformSubscription
* DeleteSegmentsByRetentionTimeTest
* QuorumControllerTest > testUncleanShutdownBrokerElrEnabled
Reviewers: Andrew Schofield <aschofield@confluent.io>
The PR fixes the behaviour when records are fetched which are larger
than `fetch.max.bytes` config.
The usage of `hardMaxBytesLimit` is in ReplicaManager where it decides
whether to fetch a single record or not. The file records get sliced
based on the bytes requested. However, if `hardMaxBytesLimit` is false
then at least one record is fetched and bytes are adjusted accordingly in
`localLog`.
Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.
A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.
Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
Cached TransactionIndex may get closed if interrupted, causing following calls to always fail with ClosedChannelException, and forcing process to be restarted. In order to avoid this issue, a new method is exposed by TransactionIndex to validate state of channel; and index is reopened if closed.
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Nikhil Ramakrishnan <ramakrishnan.nikhil@gmail.com>
The purpose of this PR is to remove the `@InterfaceStability.Evolving` from classes that were created over a year ago.
Reviewers: Jun Rao <junrao@gmail.com>