Just because a controller node sets --no-initial-controllers flag does
not mean it is necessarily running kraft.version=1. The more precise
meaning is that the controller node being formatted does not know what
kraft version the cluster should be in, and therefore it is only safe to
assume kraft.version=0. Only by setting
--standalone,--initial-controllers, or --no-initial-controllers
AND not specifying the controller.quorum.voters static config, is it
known kraft.version > 0.
For example, it is a valid configuration (although confusing) to run a
static quorum defined by controller.quorum.voters but have all the
controllers format with --no-initial-controllers. In this case,
specifying --no-initial-controllers alongside a metadata version that
does not support kraft.version=1 causes formatting to fail, which is
a regression.
Additionally, the formatter should not check the kraft.version against
the release version, since kraft.version does not actually depend on any
release version. It should only check the kraft.version against the
static voters config/format arguments.
This PR also cleans up the integration test framework to match the
semantics of formatting an actual cluster.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Kuan-Po Tseng
<brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, José
Armando García Sancio <jsancio@apache.org>
The given PR mostly fixes the order of arguments in `assertEquals()` for
the Clients module. Some minor cleanups were included with the same too.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In the create topic request we send a CreateTopic request in an
Envelope, so we need to also unpack the response correctly
Reviewers: Lucas Brutschy <lucasbru@apache.org>
Remove invalid mentions of default values for group.id,
config.storage.topic, offset.storage.topic, status.storage.topic
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang <s7133700@gmail.com>
Improve `MetadataVersion.fromVersionString()` to take an
`enableUnstableFeature` flag, and enable `FeatureCommand` and
`StorageTool` to leverage the exception message from
`fromVersionString`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
*What*
In the implementation of KIP-1147 for console tools -
https://github.com/apache/kafka/pull/20479/files#diff-85b87c675a4b933e8e0e05c654d35d60e9cfd36cebe3331af825191b2cc688ee,
we missed adding unit tests for verifying the new
"`--formatter-property`" option.
Thanks to @Yunyung for pointing this out.
PR adds unit tests to both `ConsoleConsumerOptionsTest` and
`ConsoleShareConsumerOptionsTest` to verify the same.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.
In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.
Reviewers: Sean Quah <squah@confluent.io>, David Jacot
<djacot@confluent.io>
1. Fix doc of `inter.worker.signature.algorithm` config in
`DistributedConfig`.
2. Improve the style of the `inter.worker.verification.algorithms` and
`worker.unsync.backoff.ms` config.
3. `INTER_WORKER_KEY_TTL_MS_MS_DOC` -> `INTER_WORKER_KEY_TTL_MS_DOC`.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Basically, one of the refactor tasks. In this PR, I have moved
`DelegationTokenPublisher` to the metadata module. Similar to the
`ScramPublisher` migration (commit feee50f476), I have moved
`DelegationTokenManager` to the server-common module, as it would
otherwise create a circular dependency. Moreover, I have made multiple
changes throughout the codebase to reference `DelegationTokenManager`
from server-common instead of the server module.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Jira: https://issues.apache.org/jira/browse/KAFKA-17554
In the previous workflow, the test passes under two conditions:
1. The `t1` thread is waiting for the main thread's `client.wakeup()`.
If successful, `t1` will wake up `t2`, allowing `t2` to complete the
future.
2. If `t1` fails to receive the `client.wakeup()` from the main thread,
`t2` will be woken up by the main thread.
In the previous implementation, we used a `CountDownLatch` to control
the execution of three threads, but it often led to race conditions.
Currently, we have modified it to use two threads to test this scenario.
I run `I=0; while ./gradlew :clients:test --tests
ConsumerNetworkClientTest.testFutureCompletionOutsidePoll --rerun
--fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done`
and pass 3000+ times.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
StreamsRebalanceListenerInvoker was implemented to match the behavior of
ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a
subtly different interface than ConsumerRebalanceListener - it does not
throw exceptions, but returns it as an Optional.
In the interest of consistency, this change fixes this mismatch by
changing the StreamsRebalanceListener interface to behave more like the
ConsumerRebalanceListener - throwing exceptions directly.
In another minor fix, the StreamsRebalanceListenerInvoker is changed to
simply skip callback execution instead of throwing an
IllegalStateException when no streamRebalanceListener is defined. This
can happen when the consumer is closed before Consumer.subscribe is
called.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
We are seeing cases where a Kafka Streams (KS) thread stalls for ~20
seconds. During this stall, the broker correctly aborts the open
transaction (triggered by the 10-second transaction timeout). However,
when the KS thread resumes, instead of receiving the expected
InvalidProducerEpochException (which we already handle gracefully as
part of transaction abort), the client is instead hit with an
InvalidTxnStateException. KS currently treats this as a fatal error,
causing the application to fail.
To fix this, we've added an epoch check before the verification check to
send the recoverable InvalidProducerEpochException instead of the fatal
InvalidTxnStateException. This helps safeguard both tv1 and tv2 clients
Reviewers: Justine Olshan <jolshan@confluent.io>
We need to only pass in the reset strategy, as the `logMessage`
parameter was removed.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lucas Brutschy
<lbrutschy@confluent.io>
This MR should be couple of race conditions in RemoteIndexCacheTest.
1. There was a race condition between cache-cleanup-thread and test
thread, which wants to check that cache is gone. This was fixed with
TestUtils#waitForCondition
2. After each test we check that there is not thread leak. This check
wasn't working properly, because live of thread status is set by JVM
level, we can only set interrupted status (using private native void
interrupt0(); method under the hood), but we don't really know when JVM
will change the live status of thread. To fix this I've refactored
TestUtils#assertNoLeakedThreadsWithNameAndDaemonStatus method to use
TestUtils#waitForCondition. This fix should also affect few other tests,
which were flaky because of this check. See gradle run on
[develocity](https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.storage.internals.log.RemoteIndexCacheTest&tests.sortField=FLAKY)
After fix test were run 10000 times with repeated test annotation:
`./gradlew clean storage:test --tests
org.apache.kafka.storage.internals.log.RemoteIndexCacheTest.testCacheEntryIsDeletedOnRemoval`
... `Gradle Test Run :storage:test > Gradle Test Executor 20 >
RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition
9998 of 10000 PASSED` `Gradle Test Run :storage:test > Gradle Test
Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval()
> repetition 9999 of 10000 PASSED` `Gradle Test Run :storage:test >
Gradle Test Executor 20 > RemoteIndexCacheTest >
testCacheEntryIsDeletedOnRemoval() > repetition 10000 of 10000 PASSED`
`BUILD SUCCESSFUL in 20m 9s` `148 actionable tasks: 148 executed`
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
All Kafka component register AppInfo metrics to track the application
start time or commit-id etc. These metrics are useful for monitoring and
debugging. However, the AppInfo doesn't provide client-id, which is an
important information for custom metrics reporter.
The AppInfoParser class registers a JMX MBean with the provided
client-id, but when it adds metrics to the Metrics registry, the
client-id is not included. This KIP aims to add the client-id as a tag.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The current `metadata-shell` find command throws an exception due to
child node `zkMigrationState`.
This interrupts the output and makes the CLI less usable.
Additionally, `zkMigrationState` is no longer used in Kafka 4.x, but it
is still defined under image/features, which causes misleading error
messages.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
- **Reasons:** In this case, the `exit(int statusCode)` method invokes
`exit(statusCode, null)`, which means
the `message` argument is always `null` in this code path. As a result,
assigning
`exitMessage` has no effect and can be safely removed.
- **Changes:** Remove a redundant field assignment.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Optimized `getRemainingRecords()` method by replacing inefficient
`containsKey() + get()` pattern with `getOrDefault()` to reduce map
lookups from 2 to 1 per partition.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
- Improve the docs for Record Headers.
- Add integration tests to verify that the order of headers in a record
is preserved when producing and consuming.
- Add unit tests for RecordHeaders.java.
Reviewers: Ken Huang <s7133700@gmail.com>, Hong-Yi Chen
<apalan60@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Adding the missing metric to track the number of partitions assigned.
This metric should be registered whenever the consumer is using a
groupId, and should track the number of partitions from the subscription
state, regardless of the subscription type (manual or automatic).
This PR registers the missing metric as part of the
ConsumerRebalanceMetricsManager setup. This manager is created if there
is a group ID, and reused by the consumer membershipMgr and the
streamsMemberhipMgr, so we ensure that the metric is registered for the
new consumer and streams.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, TengYao Chi
<frankvicky@apache.org>
The `metrics` attribute in `StreamsGroup` is not used anymore. This
patch removes it.
Reviewers: Ken Huang <s7133700@gmail.com>, Lucas Brutschy
<lbrutschy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
Also add a benchmark for group coordinator loading.
Reviewers: David Jacot <djacot@confluent.io>
Follow-up to [#11193](https://github.com/apache/kafka/pull/11193). This
change adds cleanup of the temporary log and metadata directories
created by RaftManagerTest so they are removed after each test run.
Without this cleanup, the directories remain until the entire test suite
completes, leaving extra files in the system temporary directory.
Testing:
- Ran `./gradlew core:test --tests kafka.raft.RaftManagerTest` and
confirmed all tests pass.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Small improvements to the argument descriptions in the usage messages
for the console producer/consumer tools.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Fix indentation in `CoordinatorLoaderImplTest` to be consistent with the
rest of the code in the package.
Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
The `metrics` attribute in `ConsumerGroup` is not used anymore. This
patch removes it.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Dongnuo Lyu
<dlyu@confluent.io>
This patch fixes a few typos in CoordinatorRecordTypeGenerator.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Sean Quah <squah@confluent.io>
*What*
We were missing a parantheses when we invoked a method
`supports_formatter_property()`. This would mean we would get the object
not call the function.
PR fixes this by including parantheses and invoking the actual function.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
document the behavior of "-1" (HIGH_WATERMARK)
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
*What*
https://issues.apache.org/jira/browse/KAFKA-19623
- The PR implements KIP-1147
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1147%3A+Improve+consistency+of+command-line+arguments)
for the console tools i.e. `ConsoleProducer`, `ConsoleConsumer` and
`ConsoleShareConsumer`.
- Currently the previous names for the options are still usable but
there will be warning message stating those are deprecated and will be
removed in a future version.
- I have added unit tests and also manually verified using the console
tools that things are working as expected.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Jimmy Wang
<48462172+JimmyWang6@users.noreply.github.com>
The format of the code in `CoordinatorLoaderImpl` in inconsistent with
the rest of the code in the package. This small PR fixes it.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Sean
Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This PR introduces an ExpiringErrorCache that temporarily stores topic
creation errors, allowing the system to provide detailed failure reasons
in subsequent heartbeat responses.
Key Designs:
Time-based expiration: Errors are cached with a TTL based on the
streams group heartbeat interval (2x heartbeat interval). This ensures
errors remain available for at least one retry cycle while preventing
unbounded growth. 2. Priority queue for efficient expiry: Uses a
min-heap to track entries by expiration time, enabling efficient cleanup
of expired entries during cache operations. 3. Capacity enforcement:
Limits cache size to prevent memory issues under high error rates. When
capacity is exceeded, oldest entries are evicted first. 4. Reference
equality checks: Uses eq for object identity comparison when cleaning up
stale entries, avoiding expensive value comparisons while correctly
handling entry updates.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
This is the first part of cleaning up of the tests in `TaskManagerTest`
- Removed dead tests
- Added new tests as suggested earlier
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Clarify rebalance callbacks behaviour (got some questions for
onPartitionsAssigned, docs where indeed confusing about the partitions
received in params). Reviewed all rebalance callbacks with it.
Reviewers: Bill Bejeck<bbejeck@apache.org>
In the consumer, we invoke the consumer rebalance onPartitionRevoked or
onPartitionLost callbacks, when the consumer closes. The point is that
the application may want to commit, or wipe the state if we are closing
unsuccessfully.
In the StreamsRebalanceListener, we did not implement this behavior,
which means when closing the consumer we may lose some progress, and in
the worst case also miss that we have to wipe our local state state
since we got fenced.
In this PR we implement StreamsRebalanceListenerInvoker, very similarly
to ConsumerRebalanceListenerInvoker and invoke it in Consumer.close.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>, TengYao Chi <frankvicky@apache.org>,
Uladzislau Blok <123193120+UladzislauBlok@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Move the `RaftManager` interface to raft module, and remove the
`register` and `leaderAndEpoch` methods since they are already part of
the RaftClient APIs.
- Rename RaftManager.scala to KafkaRaftManager.scala.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Clarify timeout errors received on send if the case is topic not in
metadata vs partition not in metadata. Add integration tests showcases
the difference Follow-up from 4.1 fix for misleading timeout error
message (https://issues.apache.org/jira/browse/KAFKA-8862)
Reviewers: TengYao Chi <frankvicky@apache.org>, Kuan-Po Tseng
<brandboat@gmail.com>
This test case ensures that the parser can convert ISO8601 correctly.
However, when the local time falls on a different day than the UTC time,
there will be an off-by-one issue.
I changed the test to convert the local time and then compare it with
the expected local time. This should fix the off-by-one issue.
[Reference
link](https://github.com/apache/kafka/pull/18611#discussion_r2318146619)
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: Alex <wenhsuan.alexyu@gmail.com>
In the current solution, we only use a heap to select the right process,
but resort to linear search for selecting a member within a process.
This means use cases where a lot of threads run within the same process
can yield slow assignment. The number of threads in a process shouldn’t
scale arbitrarily (our assumed case for benchmarking of 50 threads in a
single process seems quite extreme already), however, we can optimize
for this case to reduce the runtime further.
Other assignment algorithms assign directly on the member-level, but we
cannot do this in Kafka Streams, since we cannot assign tasks to
processes that already own the task. Defining a heap directly on members
would mean that we may have to skip through 10s of member before finding
one that does not belong to a process that does not yet own the member.
Instead, we can define a separate heap for each process, which keeps the
members of the process by load. We can only keep the heap as long as we
are only changing the load of the top-most member (which we usually do).
This means we keep track of a lot of heaps, but since heaps are backed
by arrays in Java, this should not result in extreme memory
inefficiencies.
In our worst-performing benchmark, this improves the runtime by ~2x on
top of the optimization above.
Also piggybacked are some minor optimizations / clean-ups: -
initialize HashMaps and ArrayLists with the right capacity - fix some
comments - improve logging output
Note that this is a pure performance change, so there are no changes to
the unit tests.
Reviewers: Bill Bejeck<bbejeck@apache.org>
This PR migrates the `TransactionLogTest` from Scala to Java for better
consistency with the rest of the test suite and to simplify future
maintenance.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>