It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses
for situations where
- `streams.version` not upgraded to 1
- `streams.version` enabled, multiple groups listening to the same
topic.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
We can rewrite this class from scala to java and move to server-common
module. To maintain backward compatibility, we should keep the logger
name `state.change.logger`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
JIRA: KAFKA-18185
This is a follow-up of #17614 The patch is to remove the
`internal.leave.group.on.close` config.
Reviewers: Sophie Blee-Goldman <ableegoldman@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>, Bill Bejeck <bbejeck@apache.org>
Partially addresses KAFKA-15873. When filtering and sorting, we should
be applying the filter before the sort of topics. Order that
unauthorizedForDescribeTopicMetadata is added to not relevant as it is a
HashSet.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Calvin Liu
<caliu@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Some sections are not very clear, and we need to update the
documentation.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When toology not configured.
In the streams group heartbeat, we validate the topology set for the
group against the topic metadata, to generate the "configured topology"
which has a specific number of partitions for each topic.
In streams group describe, we use the configured topology to expose this
information to the user. However, we leave the topology information as
null in the streams group describe response, if the topology is not
configured. This triggers an IllegalStateException in the admin client
implementation.
Instead, we should expose the unconfigured topology when the configured
topology is not available, which will still expose useful information.
Reviewers: Matthias J. Sax <matthias@confluent.io>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Just because a controller node sets --no-initial-controllers flag does
not mean it is necessarily running kraft.version=1. The more precise
meaning is that the controller node being formatted does not know what
kraft version the cluster should be in, and therefore it is only safe to
assume kraft.version=0. Only by setting
--standalone,--initial-controllers, or --no-initial-controllers
AND not specifying the controller.quorum.voters static config, is it
known kraft.version > 0.
For example, it is a valid configuration (although confusing) to run a
static quorum defined by controller.quorum.voters but have all the
controllers format with --no-initial-controllers. In this case,
specifying --no-initial-controllers alongside a metadata version that
does not support kraft.version=1 causes formatting to fail, which is
a regression.
Additionally, the formatter should not check the kraft.version against
the release version, since kraft.version does not actually depend on any
release version. It should only check the kraft.version against the
static voters config/format arguments.
This PR also cleans up the integration test framework to match the
semantics of formatting an actual cluster.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Kuan-Po Tseng
<brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, José
Armando García Sancio <jsancio@apache.org>
In the create topic request we send a CreateTopic request in an
Envelope, so we need to also unpack the response correctly
Reviewers: Lucas Brutschy <lucasbru@apache.org>
Improve `MetadataVersion.fromVersionString()` to take an
`enableUnstableFeature` flag, and enable `FeatureCommand` and
`StorageTool` to leverage the exception message from
`fromVersionString`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Basically, one of the refactor tasks. In this PR, I have moved
`DelegationTokenPublisher` to the metadata module. Similar to the
`ScramPublisher` migration (commit feee50f476), I have moved
`DelegationTokenManager` to the server-common module, as it would
otherwise create a circular dependency. Moreover, I have made multiple
changes throughout the codebase to reference `DelegationTokenManager`
from server-common instead of the server module.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
StreamsRebalanceListenerInvoker was implemented to match the behavior of
ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a
subtly different interface than ConsumerRebalanceListener - it does not
throw exceptions, but returns it as an Optional.
In the interest of consistency, this change fixes this mismatch by
changing the StreamsRebalanceListener interface to behave more like the
ConsumerRebalanceListener - throwing exceptions directly.
In another minor fix, the StreamsRebalanceListenerInvoker is changed to
simply skip callback execution instead of throwing an
IllegalStateException when no streamRebalanceListener is defined. This
can happen when the consumer is closed before Consumer.subscribe is
called.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
We are seeing cases where a Kafka Streams (KS) thread stalls for ~20
seconds. During this stall, the broker correctly aborts the open
transaction (triggered by the 10-second transaction timeout). However,
when the KS thread resumes, instead of receiving the expected
InvalidProducerEpochException (which we already handle gracefully as
part of transaction abort), the client is instead hit with an
InvalidTxnStateException. KS currently treats this as a fatal error,
causing the application to fail.
To fix this, we've added an epoch check before the verification check to
send the recoverable InvalidProducerEpochException instead of the fatal
InvalidTxnStateException. This helps safeguard both tv1 and tv2 clients
Reviewers: Justine Olshan <jolshan@confluent.io>
When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
Also add a benchmark for group coordinator loading.
Reviewers: David Jacot <djacot@confluent.io>
Follow-up to [#11193](https://github.com/apache/kafka/pull/11193). This
change adds cleanup of the temporary log and metadata directories
created by RaftManagerTest so they are removed after each test run.
Without this cleanup, the directories remain until the entire test suite
completes, leaving extra files in the system temporary directory.
Testing:
- Ran `./gradlew core:test --tests kafka.raft.RaftManagerTest` and
confirmed all tests pass.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR introduces an ExpiringErrorCache that temporarily stores topic
creation errors, allowing the system to provide detailed failure reasons
in subsequent heartbeat responses.
Key Designs:
Time-based expiration: Errors are cached with a TTL based on the
streams group heartbeat interval (2x heartbeat interval). This ensures
errors remain available for at least one retry cycle while preventing
unbounded growth. 2. Priority queue for efficient expiry: Uses a
min-heap to track entries by expiration time, enabling efficient cleanup
of expired entries during cache operations. 3. Capacity enforcement:
Limits cache size to prevent memory issues under high error rates. When
capacity is exceeded, oldest entries are evicted first. 4. Reference
equality checks: Uses eq for object identity comparison when cleaning up
stale entries, avoiding expensive value comparisons while correctly
handling entry updates.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
- Move the `RaftManager` interface to raft module, and remove the
`register` and `leaderAndEpoch` methods since they are already part of
the RaftClient APIs.
- Rename RaftManager.scala to KafkaRaftManager.scala.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Clarify timeout errors received on send if the case is topic not in
metadata vs partition not in metadata. Add integration tests showcases
the difference Follow-up from 4.1 fix for misleading timeout error
message (https://issues.apache.org/jira/browse/KAFKA-8862)
Reviewers: TengYao Chi <frankvicky@apache.org>, Kuan-Po Tseng
<brandboat@gmail.com>
This PR migrates the `TransactionLogTest` from Scala to Java for better
consistency with the rest of the test suite and to simplify future
maintenance.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The ApiError.fromThrowable(t) is going to return a generic
Errors.UNKNOWN_SERVER_ERROR back to the calling client (CLI for
instance) (eg if the broker has an authZ issue with ZK) and such
UnknownServerException should have a matching ERROR level log in the
broker logs IHMO to make it easier to troubleshoot
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
## Changes
This PR improves the stability of the
PlaintextAdminIntegrationTest.testElectPreferredLeaders test by
introducing short Thread.sleep( ) delays before invoking:
- changePreferredLeader( )
- waitForBrokersOutOfIsr( )
## Reasons
- Metadata propagation for partition2 :
Kafka requires time to propagate the updated leader metadata across all
brokers. Without waiting, metadataCache may return outdated leader
information for partition2.
- Eviction of broker1 from the ISR :
To simulate a scenario where broker1 is no longer eligible as leader,
the test relies on broker1 being removed from the ISR (e.g., due to
intentional shutdown). This eviction is not instantaneous and requires a
brief delay before Kafka reflects the change.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR is a follow-up from https://github.com/apache/kafka/pull/20468.
Basically makes two things:
1. Moves the variable to the catch block as it is used only there.
2. Refactor FeaturesPublisher to handle exceptions the same as
ScramPublisher or other publishers :)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
---------
Signed-off-by: see-quick <maros.orsak159@gmail.com>
The method rollbackOrProcessStateUpdates in SharePartition received 2
separate lists of updatedStates (InFlightState) and stateBatches
(PersisterStateBatch). This PR introduces a new subclass called
`PersisterBatch` which encompasses both these objects.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Integration tests for Stream Admin related API
Previous PR: https://github.com/apache/kafka/pull/20244
This one adds:
- Integration test for Admin#listStreamsGroupOffsets API
- Integration test for Admin#deleteStreamsGroupOffsets API
- Integration test for Admin#alterStreamsGroupOffsets API
Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Lucas Brutschy
<lucasbru@apache.org>
As the PR title suggests, this PR is an attempt to perform some cleanups
in the server module. The changes are mostly around the use of Record
type for some classes, changes to use enhanced switch, etc.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
We add the three main changes in this PR
- Disallowing null values for most LIST-type configurations makes sense,
since users cannot explicitly set a configuration to null in a
properties file. Therefore, only configurations with a default value of
null should be allowed to accept null.
- Disallowing duplicate values is reasonable, as there are currently no
known configurations in Kafka that require specifying the same value
multiple times. Allowing duplicates is both rare in practice and
potentially confusing to users.
- Disallowing empty list, even though many configurations currently
accept them. In practice, setting an empty list for several of these
configurations can lead to server startup failures or unexpected
behavior. Therefore, enforcing non-empty lists helps prevent
misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this
trade-off is justified by the significant improvements in safety,
consistency, and overall user experience.
Additionally, we introduce two minor adjustments:
- Reclassify some STRING-type configurations as LIST-type, particularly
those using comma-separated values to represent multiple entries. This
change reflects the actual semantics used in Kafka.
- Update the default values for some configurations to better align with
other configs.
These changes will not introduce any compatibility issues.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This reverts commit d86ba7f54a.
Reverting since we are planning to change how KIP-966 is implemented. We
should revert this RPC until we have more clarity on how this KIP will
be executed.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This change adds:
- Integration test for `Admin#describeStreamsGroups` API
- Integration test for `Admin#deleteStreamsGroup` API
Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Lucas Brutschy
<lucasbru@apache.org>
---------
Co-authored-by: Lucas Brutschy <lbrutschy@gmail.com>
* Log error message if `broker.heartbeat.interval.ms * 2` is large than
`broker.session.timeout.ms`.
* Add test case
`testLogBrokerHeartbeatIntervalMsShouldBeLowerThanHalfOfBrokerSessionTimeoutMs`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add a lower bound to num.replica.fetchers.
Reviewers: PoAn Yang <payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>,
Ken Huang <s7133700@gmail.com>, jimmy <wangzhiwang611@gmail.com>,
Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Cleanup default configs in
AutoTopicCreationManager#createStreamsInternalTopics. The streams
protocol would like to be consistent with the kafka streams using the
classic protocol - which would create the internal topics using
CreateTopic and therefore use the controller config.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
These tests were written while finalizing approach for making inflight
state class thread safe but later approach changed and the lock is now
always required by SharePartition to change inflight state. Hence these
tests are incorrect and do not add any value.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Minor PR to update name of maxInFlightMessages to maxInFlightRecords to
maintain consistency in share partition related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>
As per the suggestion by @adixitconfluent and @chirag-wadhwa5,
[here](https://github.com/apache/kafka/pull/20395#discussion_r2300810004),
I have refactored the code with variable and method names.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa
<cwadhwa@confluent.io>
This is the first part of the implementation of
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
The purpose of this pull request is for the broker to start returning
the correct offset when it receives a -6 as a timestamp in a ListOffsets
API request.
Added unit tests for the new timestamp.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:
```
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
lastAcquiredOffset = lastOffset;
}
```
While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit
<adixit@confluent.io>, Chirag Wadhwa <cwadhwa@confluent.io>
This is followup PR for https://github.com/apache/kafka/pull/19699.
* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Justine Olshan
<jolshan@confluent.io>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
**Changes**: Use ClusterTest to rewrite
EligibleLeaderReplicasIntegrationTest.
**Validation**: Run the test 50 times locally with consistent success.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
As per the current implementation in archiveRecords, when LSO is
updated, if we have multiple record batches before the new LSO, then
only the first one gets archived. This is because of the following lines
of code ->
`isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1,
initialState);`
`isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch, initialState);`
The first record / batch will make `isAnyOffsetArchived` /
`isAnyBatchArchived` true, after which this line of code will
short-circuit and the methods `archivePerOffsetBatchRecords` /
`archiveCompleteBatch` will not be called again. This PR changes the
order of the expressions so that the short-circuit does not prevent from
archiving all the required batches.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
The `record-size` and `throughput` arguments don’t work in
`TestRaftServer`. The `recordsPerSec` and `recordSize` values are always
hard-coded.
- Fix `recordsPerSec` and `recordSize` values hard-coded issue
- Add "Required" description to command-line options to make it clear to
users.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>