Currently if we received just a control record in the
`ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would
not be updated as the record is ignored. But in the process, we lose the
acknowledgment for this control record which is a GAP.
PR fixes this by adding an additional map for control record
acknowledgements in `ShareFetchEvent`.
This updates both the ShareConsumerImpl and ShareConsumeRequestManager
to accommodate the additional map.
Added a unit test in `ShareConsumerImplTest` and
`ShareConsumeRequestManagerTest` to verify the changes.
Reviewers: Andrew Schofield <aschofield@confluent.io>
PR add `MaxRecords` to share fetch request and also adds
`AcquisitionLockTimeout` to share fetch response. PR also removes
internal broker config of `max.fetch.records`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch moves `ConsumerTopicCreationTest` to the
`client-integration-tests` and rewrite it as Java.
The patch also streamlines the test flow.
In the Scala version, there is a producer that produces messages, but
this is not the main purpose of the `ConsumerTopicCreationTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Enhance the documentation for Admin#describeCluster and
Admin#describeConfigs to clarify their behavior when using
bootstrap.controllers and bootstrap.servers.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The generated request data type's constructors take Readable as an input. However, the parse method in the
AbstractRequest takes a ByteBuffer as input. So to create the corresponding request data objects, each individual
concrete Request classes wraps the ByteBuffer into a ByteBufferAccessor.
This is boilerplate code present in all the concrete request classes. This changes AbstractRequest's parse method so that subclasses can simply pass the `Readable` they get directly to request data classes.
The same change is made to the serialize method to maintain symmetry.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio
<jsancio@apache.org>, Artem Livshits <alivshits@confluent.io>,
Truc Nguyen <trnguyen@confluent.io>
Previously, the `ShareConsumer.commitAsync()` method retried sending
`ShareAcknowledge` requests indefinitely. Now it will instead use the
defaultApiTimeout config to expire the request so that it does not retry forever.
PR also fixes a bug in processing `commitSync() `requests, where we
need an additional check if the node is free.
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Reviewers: Andrew Schofield <aschofield@confluent.io>
Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.
The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.
Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit adds error handling to the Streams heartbeat request
manager.
Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.
Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In past, we have `AbstractConfig#preProcessParsedConfig` but did not use
its return value
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The PR changed three things.
* Migrated `ProducerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
Default `ProducerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the producer with enabled rebootstrap
<img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60"
/>
The test case for the producer with disabled rebootstrap
<img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728"
/>
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
`clients-integration-tests` modules doesn't have the `log4j2.yaml` to
setting log, thus we should add.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
ReplicaSelector implementations can implement Monitorable to register their own metrics.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang <s7133700@gmail.com>
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The `lastOffset` includes the entire batch header, so we should check `baseOffset` instead.
To optimize this, we need to update the search logic. The previous
approach simply checked whether each batch's `lastOffset()` was greater
than or equal to the target offset. Once it found the first batch that
met this condition, it returned that batch immediately.
Now that we are using `baseOffset()`, we need to handle a special case:
if the `targetOffset` falls between the `lastOffset` of the previous
batch and the `baseOffset` of the matching batch, we should select the
matching batch. The updated logic is structured as follows:
1. First, if baseOffset exactly equals targetOffset, return immediately.
2. If we find the first batch with baseOffset greater than targetOffset
- Check if the previous batch contains the target
- If there's no previous batch, return the current batch or the previous
batch doesn't contain the target, return the current batch
5. After iterating through all batches, check if the last batch contains
the target offset.
This code path is not thread-safe, so we need to prevent `EOFException`.
To avoid this exception, I am still using an early return. In this
scenario, `lastOffset` is still used within the loop, but it should be
executed at most once within the loop.
Therefore, in the new implementation, `lastOffset` will be executed at
most once. In most cases, this results in an optimization.
Test: Verifying Memory Usage Improvement
To evaluate whether this optimization helps, I followed the steps below
to monitor memory usage:
1. Start a Standalone Kafka Server
```sh
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties
```
2. Use Performance Console Tools to Produce and Consume Records
**Produce Records:**
```sh
./kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000000 \
--record-size 100 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
```
**Consume Records:**
```sh
./bin/kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000000 \
--bootstrap-server localhost:9092
```
It can be observed that memory usage has significantly decreased.
trunk:

this PR:

Reviewers: Kirk True <kirk@kirktrue.pro>, TengYao Chi
<kitingiao@gmail.com>, David Arthur <mumrah@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch filters out the topic describe unauthorized topics from the
StreamsGroupHeartbeat and StreamsGroupDescribe response.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This patch adds logic to enable and handle two phase commit (2PC)
transactions following KIP-939.
The changes made are as follows:
1) Add a new broker config called
**transaction.two.phase.commit.enable** which is set to false by default
2) Add new flags **enableTwoPCFlag** and **keepPreparedTxn** to
handleInitProducerId
3) Return an error if keepPreparedTxn is set to true (for now)
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
<jolshan@confluent.io>
Move share consumer to clients-integration-tests module and use `@BeforeEach` to setup
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Given that the `core` module will be separated into other small modules,
this test will not be added to the core module.
Instead, I added it to the `clients-integration-tests` module since it
focuses on the admin client test. The patch should include following test cases.
1. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, but `describeConfigs`
does not.
2. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, and `describeConfigs`
does if admin is using controller.bootstrap
3. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, but `describeConfigs` does.
4. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, and `describeConfigs` does not
also if admin is using controller.bootstrap
for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of "controller.properties" BTW, those test cases should leverage new test infra, since new test infra allow us to define configs to broker/controller individually.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The Streams heartbeat request has some fields that are always sent.
Those are:
- group ID
- member ID
- member epoch
- group instance ID (if static membership is used)
Then it has fields that are only sent when joining:
- topology and topology epoch
- rebalance timeout
- process ID
- endpoint
- client tags
Finally, the assignment is only sent if it changed compared to the last
sent request.
Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
Currently when using serializers like the Cloud Event Serializer, we
need to do a work around so it doesn't throw an error. Using the method
taking the headers would prevent this. Since the default implementation
just calls the method without the headers, it's expected to be fully
backwards compatible.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>
Mark the following tests as flaky:
* StickyAssignorTest > testLargeAssignmentAndGroupWithUniformSubscription
* DeleteSegmentsByRetentionTimeTest
* QuorumControllerTest > testUncleanShutdownBrokerElrEnabled
Reviewers: Andrew Schofield <aschofield@confluent.io>
This PR aims to remove the usage of partition max bytes from share fetch
requests. Partition Max Bytes is being defined by
`PartitionMaxBytesStrategy` which was added to the broker as part of PR
https://github.com/apache/kafka/pull/17870
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit adds the conditions to decide when a Streams group heartbeat
should be sent.
A heartbeat should be sent when:
- the group coordinator is available
- the member is part of the Streams group or wants to join it
- the heartbeat interval expired or the member is leaving the group or
acknowledging the assginment
This commit does not implement:
- not sending fields that did not change
- handling errors
Reviewers: Zheguang Zhao <zheguang.zhao@alumni.brown.edu>, Lucas
Brutschy <lbrutschy@confluent.io>
Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.
A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.
Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
- Adding a space, article and punctuation to the Producer config doc
strings for consistency and readability.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Justine Olshan <jolshan@confluent.io>
Adds `describeStreamsGroup` to Admin API.
This exposes the result of the `DESCRIBE_STREAMS_GROUP` RPC in the Admin
API.
Reviewers: Bill Bejeck <bill@confluent.io>
This patch is a first step towards resolving KAFKA-18046. Apache Kafka
4.0 ships with log4j2 so the issue raised in the ticket causing high CPU
usage on the fetch path due to LoggerFactory.getLogger() being called on
the handling of all fetch responses is not good. Hence, I propose to fix
that one by caching the Logger used by the `CompletedFetch` class.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Logging on a per-batch bases is very chatty, and should only be done at
TRACE level to avoid spamming DEBUG logs.
Reviewers: Justine Olshan <jolshan@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
User testing of the `KafkaShareConsumer` interface has revealed some
areas which confuse people. One of these is that way that it decides
whether you want to use implicit or explicit acknowledgement of records
by observing which calls the application issues. We are taking the
opportunity to refine the interface before it is finalised.
This PR introduces an experimental configuration called
`internal.share.acknowledgement.mode` which can be used to make the
application declare which kind of acknowledgement it wishes to use. We
plan to try out the configuration, assess whether it has helped, and
then create a proper consumer configuration that makes this area better.
That would require a lot of change in the tests, which explains why this
initial PR only has a small number of tests.
Reviewers: David Arthur <mumrah@gmail.com>
Implement Admin API extensions beyond list/describe group (delete group,
offset-related APIs).
* adds methods for describing and manipulating offsets, as described in
KIP-1071
* adds corresponding unit tests
These are doing the exact same thing as the corresponding consumer group
counter-parts.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In this PR, we perform this refactor as the class is not needed since
there is no need to refer to child classes by common ref and the
duplicated code is minimal.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Implement Admin API extensions beyond list/describe group (delete group, offset-related APIs).
* adds methods for describing and manipulating offsets, as described in KIP-1071
* adds corresponding unit tests
These are doing the exact same thing as the corresponding consumer group counter-parts.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In KRaft mode, custom KafkaPrincipalBuilder instances must implement KafkaPrincipalSerde. This PR updates all related documentation to highlight this requirement.
Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1. Convert end txn marker schema to use auto-generated protocol`EndTxnMarker`
2. substitute `CURRENT_END_TXN_MARKER_VALUE_SIZE` with an`endTnxMarkerValueSize` method since the size is accumulated from `EndTxnMarker`.
3. add buffer to `EndTransactionMarker` to avoid twice compute from `serializeValue` and `endTnxMarkerValueSize`.
4. flexibleVersions is set to none.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1、Client support for TopicAuthException in DescribeShareGroup and HB
path
2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow
TopicAuthorizationException and GroupAuthorizationException
Reviewers: ShivsundarR <shr@confluent.io>, Andrew Schofield <aschofield@confluent.io>
The AbstractHeartbeatRequestManager and the
StreamsGroupHeartbeatRequestManager, both use the
HeartbeatRequestState to track the state of the heartbeat requests. Both
heartbeat request managers have an implementation of
HeartbeatRequestState as inner class.
To deduplicate code this commit extracts the HeartbeatRequestState so
that the same code can be used by both heartbeat request manager.
Reviewers: Kirk True <ktrue@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
There are 3 issues (at least) about the multithreaded issue on ConsumerRecords. Hence, it would be better to document it completely.
Reviewers: Kirk True <ktrue@confluent.io>, TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Xuan-Zhang Gong <gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The purpose of this PR is to remove the `@InterfaceStability.Evolving` from classes that were created over a year ago.
Reviewers: Jun Rao <junrao@gmail.com>
- Added a unit test to validate the exception hierarchy for all KIP-1050
transaction related exceptions.
- RetriableException is correctly extended by all child classes
- Included test for RetriableException exception with verification that
all exceptions extending `RetriableException` do not inadvertently
extend `RefreshRetriableException, preserving the intended behavior.
Reviewers: Kirk True <ktrue@confluent.io>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Justine Olshan <jolshan@confluent.io>
KIP-966 adds strict min ISR rule, so this PR improves the docs of min.insync.replicas to include that change.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
In ConsumerGroupHeartbeat,
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.
In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)
Reviewers: David Jacot <djacot@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
TengYao Chi <kitingiao@gmail.com>
For the KRaft implementation there is a race between the network thread,
which read bytes in the log segments, and the KRaft driver thread, which
truncates the log and appends records to the log. This race can cause
the network thread to send corrupted records or inconsistent records.
The corrupted records case is handle by catching and logging the
CorruptRecordException. The inconsistent records case is handle by only
appending record batches who's partition leader epoch is less than or
equal to the fetching replica's epoch and the epoch didn't change
between the request and response.
For the ISR implementation there is also a race between the network
thread and the replica fetcher thread, which truncates the log and
appends records to the log. This race can cause the network thread send
corrupted records or inconsistent records. The replica fetcher thread
already handles the corrupted record case. The inconsistent records case
is handle by only appending record batches who's partition leader epoch
is less than or equal to the leader epoch in the FETCH request.
Reviewers: Jun Rao <junrao@apache.org>, Alyssa Huang <ahuang@confluent.io>, Chia-Ping Tsai <chia7712@apache.org>
- Currently if we received extraneous topic partitions in the response
or if the response was missing some partitions requested, we were
processing the response as it came and even populated the callback with
these partitions.
- These invalid responses should be parsed at the
`ShareConsumeRequestManager`.
- If the response missed any acknowledgements for partitions that were
requested, then we fail the request with `InvalidRecordStateException`
and populate the callbacks.
- For any extraneous partitions in the response, we log an error and
ignore them.
Some refactors are also done in this PR in ShareConsumeRequestManager to
make the code more readable.
Reviewers: Andrew Schofield <aschofield@confluent.io>
* In this PR, we add various infra classes needed to support the
`deleteShareGroups` functionality via the `kafka-share-groups.sh`
script, as well as the implementation of `kafka-share-groups.sh --delete`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Since we no longer convert records to the old format for fetch requests, this code is no longer used in production.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Cleanup code to avoid rawtype, and add suppressions where necessary.
Change the build to fail on rawtype warning.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
3.3.0 was the first KRaft release that was deemed production-ready and also
when KIP-778 (KRaft to KRaft upgrades) landed. Given that, it's reasonable
for 4.x to only support upgrades from 3.3.0 or newer (the metadata version also
needs to be set to "3.3" or newer before upgrading).
Noteworthy changes:
1. `AlterPartition` no longer includes topic names, which makes it possible to
simplify `AlterParitionManager` logic.
2. Metadata versions older than `IBP_3_3_IV3` have been removed and
`IBP_3_3_IV3` is now the minimum version.
3. `MINIMUM_BOOTSTRAP_VERSION` has been removed.
4. Removed `isLeaderRecoverySupported`, `isNoOpsRecordSupported`,
`isKRaftSupported`, `isBrokerRegistrationChangeRecordSupported` and
`isInControlledShutdownStateSupported` - these are always `true` now.
Also removed related conditional code.
5. Removed default metadata version or metadata version fallbacks in
multiple places - we now fail-fast instead of potentially using an incorrect
metadata version.
6. Update `MetadataBatchLoader.resetToImage` to set `hasSeenRecord`
based on whether image is empty - this was a previously existing issue that
became more apparent after the changes in this PR.
7. Remove `ibp` parameter from `BootstrapDirectory`
8. A number of tests were not useful anymore and have been removed.
I will update the upgrade notes via a separate PR as there are a few things that
need changing and it would be easier to do so that way.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluen.io>, Ken Huang <s7133700@gmail.com>
This commit adds the Streams group heartbeat request manager
to the async consumer. The Streams group heartbeat request
manager is responsible to send heartbeat requests and to
process their responses.
This commit implements:
- sending of full heartbeat request (independent of any state)
- processing successful response
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Introduced two new exception classes to the Kafka error handling framework:
ApplicationRecoverableException: This exception signals that the error is recoverable, but the producer needs to be restarted. It helps in scenarios where recovery actions (like re-balancing or restoring from checkpoints) are needed.
RefreshRetriableException: This exception occurs when metadata is outdated or invalid and needs to be refreshed before retrying the request. It helps handle retries that depend on updated metadata.
Both classes are abstract and in upcoming PRs they will be extended by relevant classes as mentioned in KIP-1050:Exception Table.
Reviewers: Justine Olshan <jolshan@confluent.io>, Sanskar Jhajharia <jhajharia.sanskar@gmail.com>
Frequently updating the trust store can cause unexpected termination of the AsyncConsumer background thread.
1. To resolve this issue, reuse the same AdminClient instead of recreating it.
2. Add error logging when fail to initialize resources for the consumer network thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KAFKA-16720 aims to add the support for the AlterShareGroupOffsets AdminClient. Key Changes in the PR:
1. Added handing of alterShareGroupOffsets() in KafkaAdminClient and introduce AlterShareGroupOffsetRequest/AlterShareGroupOffsetResponse/AlterShareGroupOffsetsOptions classes.
2. Corresponding test in KafkaAdminClientTest.
3. Added ALTER_SHARE_GROUP_OFFSETS API (will finish it in next PR and the share coordinator pieces)
Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Most of the changes are obvious clean-ups/fixes. A couple of noteworthy items:
1. Support for non LTS versions is clarified (we were incorrectly stating full support
for Java 23).
2. TLS version negotiation details are clarified.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit ensures that the ClientQuotaCallback#updateClusterMetadata method is executed in KRaft mode. This method is triggered whenever a topic or cluster metadata change occurs. However, in KRaft mode, the current implementation of the updateClusterMetadata API is inefficient due to the requirement of creating a full Cluster object. To address this, a follow-up issue (KAFKA-18239) has been created to explore more efficient mechanisms for providing cluster information to the ClientQuotaCallback without incurring the overhead of a full Cluster object creation.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The Streams membership manager is used client-side in the
background thread of the async consumer. For each member
/consumer, it is responsible for:
* keeping the member state,
* keeping assignments for the member,
* reconciling the assignments of the member -- for example
when tasks need to be revoked before other tasks are assigned
* requesting invocations of assignment and revocation callbacks
by the stream thread.
The Streams membership manager is called by the background thread of
the async consumer, directly in its event loop and from the
Streams group heartbeat request manager. The Streams membership
manager uses the Streams rebalance events processor to request
assignment/revocation callback in the stream thread.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bill Bejeck <bill@confluent.io>
Return produce v0-v2 as supported versions in `ApiVersionsResponse`, but disable support
for it everywhere else.
Since clients pick the highest supported version by both client and broker during version
negotiation, this solves the problem with minimal tech debt (even though it's not ideal that
`ApiVersionsResponse` becomes inconsistent with the actual protocol support).
Add one test for the socket server handling (in `ProcessorTest`) and one test for the
client behavior (in `ProduceRequestTest`). Adjust a couple of api versions tests to verify
the new behavior.
Finally, include a few clean-ups in `ApiKeys`, `Protocol`, `ProduceRequest`,
`ProduceRequestTest` and `BrokerApiVersionsCommandTest`.
Reference to related librdkafka issue:
https://github.com/confluentinc/librdkafka/issues/4956
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
The PR corrects the check which was introduced in #5332 where position is checked to be within boundaries of file. The check
position > currentSizeInBytes - start
is incorrect, since the position is relative to start.
Reviewers: Jun Rao <junrao@gmail.com>
This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.
Background
Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.
The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().
On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.
This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.
Options
The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:
The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
The background thread should skip the fetch request generation entirely if there are any buffered partitions
The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
The background thread should skip fetching from the nodes that have buffered partitions
Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jun Rao <junrao@gmail.com>
While testing, it was found that the not_enough_replicas error was super common and could be easily confused. Since we are already bumping the request, we can signify that the produce request may return this error and new clients can handle it
(Note, the java client should be able to handle this already as a retriable error, but other client libraries may need to implement this change)
Reviewers: Justine Olshan <jolshan@confluent.io>
Ensure we always return empty records (including cases where an error is returned).
We also remove `nullable` from `records` since it is effectively expected to be
non-null by a large percentage of clients in the wild.
This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.
Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
During testing, we identified that kafka-python (and aiokafka) relies on metadata request v0 and
hence we need to add these back to comply with the premise of KIP-896 - i.e. it should not
break the clients listed within it.
I reverted the changes from #18218 related to the removal of metadata versions 0-3.
I will submit a separate PR to undeprecate these API versions on the relevant 3.x branches.
kafka-python (and aiokafka) work correctly (produce & consume) with this change on
top of the 4.0 branch.
Reviewers: David Arthur <mumrah@gmail.com>
The following tests were previously reported as flaky but were only annotated with a comment in pull request #18558 due to module dependency limitations:
testAdminClientApisAuthenticationFailure
testOutdatedCoordinatorAssignment
testThrottledProducerConsumer
With the introduction of the new test infrastructure #18602 , which allows all modules to use the @Flaky annotation, these tests should now be updated to include the @Flaky annotation.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch reorganizes our test infrastructure into three Gradle modules:
":test-common:test-common-internal-api" is now a minimal dependency which exposes interfaces and annotations only. It has one project dependency on server-common to expose commonly used data classes (MetadataVersion, Feature, etc). Since this pulls in server-common, this module is Java 17+. It cannot be used by ":clients" or other Java 11 modules.
":test-common:test-common-util" includes the auto-quarantined JUnit extension. The @Flaky annotation has been moved here. Since this module has no project dependencies, we can add it to the Java 11 list so that ":clients" and others can utilize the @Flaky annotation
":test-common:test-common-runtime" now includes all of the test infrastructure code (TestKitNodes, etc). This module carries heavy dependencies (core, etc) and so it should not normally be included as a compile-time dependency.
In addition to this reorganization, this patch leverages JUnit SPI service discovery so that modules can utilize the integration test framework without depending on ":core". This will allow us to start moving integration tests out of core and into the appropriate sub-module. This is done by adding ":test-common:test-common-runtime" as a testRuntimeOnly dependency rather than as a testImplementation dependency. A trivial example was added to QuorumControllerTest to illustrate this.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
Currently, Producer.send doc is inconsistent with actual exception behavior
- TimeoutException: This won't be thrown from send on buffer-full or metadata-missing actually. Instead, it will returned as failed future.
- AuthenticationException/AuthorizationException: These exceptions are also won't be thrown. Returned with failed future actually.
Fixed Callback javadoc and ProducerConfig doc as well.
Reviewers: Luke Chen <showuon@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance
events sent from the background thread of the async
consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance
events.
Additionally, this commit adds StreamsRebalanceData that
maintains the data that is exchanges for the Streams rebalance
protocol.
All of these are used by the Streams heartbeat request manager
and the Streams membership manager that will be added in a future
commit.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This patch does a few things:
1) Replace ApiMessageAndVersion by ApiMessage in CoordinatorRecord for the key
2) Leverage the fact that ApiMessage exposes the apiKey. Hence we don't need to specify the key anymore.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Update producer id request / response formats and transaction log value format. There is no functional change.
Reviewers: Justine Olshan <jolshan@confluent.io>, Calvin Liu <caliu@confluent.io>
We should provide the same informative error message for both timeout
cases.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Kafka 4.0 will remove support for zk mode and will require conversion to kraft
before upgrading to 4.0. The minimum kraft version is 3.0 (aka 3.0-IV1).
This provides an opportunity to remove exclusively server side protocols versions
that only exist to allow direct upgrades from versions older than 3.0 or that are
used only by zk mode.
Since KRaft became production ready in 3.3, we should consider setting the
baseline to 3.3. But that requires more discussion and it can be done via a
separate change (KAFKA-18601).
Protocol changes:
* Remove RequestHeader v0 (only used by ControlledShutdown v0)
* Remove WriteTxnMarkers v0
* Remove all versions of ControlledShutdown, LeaderAndIsr, StopReplica, UpdateMetadata
In order to remove all versions safely, extend generator to support setting
"versions" to "none". In this case, we no longer generate the `*Data` classes,
but we still reserve the id for the relevant protocol api (so it doesn't get
accidentally used for something else). The protocol documentation is correct
after these changes.
We kept a simplified version of `LeaderAndIsr{Request|Response}` because
it's used by many tests that are still relevant in kraft mode. Once
KAFKA-18486 is done, it may be possible to remove it (I left a comment on
the ticket). Similarly, KAFKA-18487 may make it possible to remove
the introduced `StopReplicaPartitionState` (left a comment on that ticket too).
There are a number of places that were adjusted to include an
`ApiKeys.hasValidVersion` check.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR implements the second part of KIP-996 and KAFKA-16164 (tasks KAFKA-16607, KAFKA-17642, KAFKA-17643, KAFKA-17675) which encompass the response handling of PreVotes, addition of new ProspectiveState, update to metrics, and addition of Raft simulation tests.
Voters now transition to ProspectiveState first before CandidateState to prevent unnecessary epoch bumps. Voters in ProspectiveState send PreVotes requests which are Vote requests with PreVote set to true.
Follower grants PreVotes if it has not yet fetched successfully from leader. Leader denies all PreVotes. Unattached, Prospective, Candidate, and Resigned will grant PreVotes if the requesting replica's log is at least as long as theirs. Granted PreVotes are not persisted like standard votes. It is possible for a voter to grant several PreVotes in the same epoch.
The only state which is allowed to transition directly to CandidateState is ProspectiveState. This happens on reception of majority of granted PreVotes or if at least one voter doesn't support PreVote requests.
Prospective will transition to Follower after election loss/timeout if it was already aware of last known leader and the leader's endpoint, or at any point if it discovers the leader.
Prospective will transition to Unattached after election loss/timeout if it does not know the leader endpoints.
After electionTimeout, Resigned now always transitions to Unattached and increases the epoch.
Prospective grants standard votes if it has not already granted a standard vote (no votedKey), has no leaderId, and the recipient's log is current enough
Candidate no longer backs off after election timeout. Candidate still backs off after election loss.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Since the example.com DNS lookup changed the second time within one
year, we rewrote the unit tests for ClientUtils so that they do
not make a real DNS lookup to the outside but use mocks.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered.
Reviewers: Justine Olshan <jolshan@confluent.io>
SASL mechanisms that do support neither integrity nor confidentality should throw exception on wrap/unwrap.
The current implementation does not implement wrap/unwrap correctly.
This may cause security issues, if the code using the mechanisms does
not check for QOP correctly.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <i@soarez.me>
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer.
Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <david.jacot@gmail.com>
KIP-863 introduced a change to ByteBufferDeserializer which is not
properly documented, but should be called out because it could surface
bugs in application code which using ByteBufferDeserializer.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().
Reviewers: Matthias J. Sax <matthias@confluent.io>, David Jacot <djacot@confluent.io>
Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:
1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.
Add integration tests, these tests existed before #18267 - restored, modified and
extended them.
Reviewers: Jun Rao <jun@confluent.io>
The consumer/producer JavaDocs still contain instruction for naively
computing the offset to be committed.
This PR updates the JavaDocs with regard to the improvements of KIP-1094.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
Relevant methods:
1. `List.of`, `Set.of`, `Map.of` and similar (introduced in Java 9)
2. Optional: `isEmpty` (introduced in Java 11), `stream` (introduced in Java 9).
Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
This is no longer required since we dropped support for Java 8. Also update `NOTICE*` and
`spotbugs-exclude.xml` files.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
The PR implements the behaviour defined in KIP-1109. It corrects the consumer topic and topic-partition metrics, while deprecating the incorrect ones.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>, Andrew Schofield <aschofield@confluent.io>
A minor refactoring just before merging #18295 introduced a regression and no test failed. Throw the correct exception and add test to verify it. Also refactor the code slightly to make that possible.
Thanks to Chia-Ping for catching the issue.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add specific error handling for unsupported version in share consumer and consumer
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Clients that support SASL but don't implement KIP-43 (eg Kafka producer/consumer 0.9.0.x) will
fail to connect after this change.
Added unit tests and also manually tested with the console producer 0.9.0.
While testing, I noticed that the logged message when a 0.9.0 Java client is used without sasl is
slightly misleading - fixed that too.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This makes it possible to enable request logs for deprecated protocol api versions without enabling it for the rest. Combined with the ability to enable/disable dynamically, it makes it a bit easier to collect the information about deprecated clients that is not available via metrics.
This isn't particularly useful in trunk/4.0 since there are no deprecated api versions in these versions, but it will be useful for older branches. I intend to backport to those branches and add a release note in the backport regarding the change in behavior.
I manually verified that:
1. If the request logger is configured at `INFO` level, only deprecated protocol api versions are logged and they are logged at `INFO` level.
2. If the request logger is configured at `DEBUG` level, all requests are logged but the log level is `INFO` for deprecated protocol api versions and `DEBUG` for the rest.
3. If the request logger is configured at `WARN` level (the default), no requests are logged.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Added transaction version 2 to some of the system tests. Also marking TV2 as production ready.
Also fixes the defaultVersion test.
Reviewers: Jun Rao <jun@confluent.io>
Included in this change:
1. Remove deprecated protocol api versions from json files.
3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs).
4. Remove record down-conversion support from KafkaApis.
5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1].
6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant
configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since
down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724.
7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version.
8. Simplify producer logic since it only supports the v2 record format now.
9. Fix tests so they don't exercise protocol api versions that have been removed.
10. Add upgrade note.
Testing:
1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below).
3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered.
4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR.
5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2].
6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine.
7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4].
8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5].
9. kcat 1.6.0-1.7.0 produce works fine.
10. kcat 1.7.1 (based on librdkafka 1.8.2) works fine for consumer and produce.
11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce.
12. I will test more clients, but I don't think we need to block the PR on that.
Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported.
Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis,
but only what was required due to the schema changes). We can probably do that in master only as it does
not change behavior.
Note that I did not touch `ignorable` fields even though some of them could have been
changed. The reasoning is that this could result in incompatible changes for clients
that use new protocol versions without setting such fields _if_ we don't manually
validate their presence. I will file a JIRA ticket to look into this carefully for each
case (i.e. if we do validate their presence for the appropriate versions, we can
set them to ignorable=false in the json file).
[1] We would return this error if a fetch < v10 was used and the compression topic config was set
to zstd, but we would not do the same for the case where zstd was compressed at the producer
level (the most common case). Since there is no efficient way to do the check for the common
case, I made it consistent for both by having no checks.
[2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1```
[3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker`
Reviewers: David Arthur <mumrah@gmail.com>
Librdkafka totally breaks if produce v3 is removed - it starts sending records with record format v0.
These api versions have to be undeprecated - KIP-896 has been updated.
Reviewers: David Arthur <mumrah@gmail.com>
This PR Adds entityType: topicName to SubscribedTopicNames in ShareGroupHeartbeatRequest.json as per the recent update to kip-932.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator.
* Introduce `coordinator-key` and `coordinator-value` in the schema;
* Introduce `apiKey` for those. This is done to avoid relying on the version to determine the type.
* It also allows the generator to enforce some rules: the key cannot use flexible versions, the key must have a single version `0`, there must be a key and a value for a given api key, etc.
* It generates an enum with all the coordinator record types. This is pretty handy in the code.
The patch also updates the group coordinators to use those.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Andrew Schofield <aschofield@confluent.io>
We want to bump the epoch if we are upgrading to TV2. Given that we already have code in place for this, I thought we could piggyback on the completing transaction epoch bump logic. For just initializing producers, I moved the check to the end of InitTransaction. Note, we have do do this check after we initialize the producer ID to ensure we have updated ApiVersions correctly.
Reviewers: Calvin Liu <caliu@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
Adds a new RPC StreamsGroupDescribe that returns, given the group ID, all metadata related to the streams group, such as
- The topology metadata of the group.
- The topology epoch of the group.
- The latest member metadata that each member provided through the StreamsGroupHeartbeat API.
- The current target assignment generated by the assignor.
- This just adds the JSON as defined in KIP-1071, together with some plumbing.
Reviewers: Bill Bejeck <bbejeck@gmail.com>
The StreamsGroupHeartbeat API is the new core API used by streams application to form a group. The API allows members to initialize a topology, advertise their state, and their owned tasks. The group coordinator uses it to assign/revoke tasks to/from members. This API is also used as a liveness check.
This change adds the JSON definition of the RPC, as defined in KIP-1071.
Reviewers: Bruno Cadonna <cadonna@apache.org>
- Disallow configuring -1 for copier and expiration thread pools dynamically
Co-authored-by: Peter Lee <peterxcli@gmail.com>
Reviewers: Peter Lee <peterxcli@gmail.com>, Satish Duggana <satishd@apache.org>
Introduces v2 of Vote RPC and implements the handling of the new version of the RPC.
Many references to "candidate" in the Vote RPC are changed to the more generic "replica". Replicas sending Vote request with PreVote set to true are not candidate. They are instead prospective candidate that are attempting to become candidate.
Replicas receiving PreVote requests (vote request with PreVote=true) with an epoch equal to their own will _not_ transition to Unattached state. They will only grant the vote if they have not recently fetched from leader and the request's last epoch and offset are up-to-date with theirs.
If a replica receives a PreVote request with an epoch greater than their current epoch, they will transition to Unattached state (setting their epoch to the one from the pre-vote request) and then grant the vote if the request's last epoch and offset are up-to-date with theirs.
To avoid a possible ping-pong scenario. For example, there is 3 node quorum, leader node A disconnects from quorum, node B goes into prospective state first before node C, node B sends pre-vote request to node C still in follower state and receives back that node A is leader, node B transitions to follower while node C transitions to prospective after election timeout. If you repeat this interaction, it is possible for such replicas to transition from Follower to Prospective in perpetuity. This issue is resolved by having follower state nodes grant pre-vote requests only if they have successfully fetched from the leader at least once after becoming a follower.
This change introduces a new suite called KafkaRaftClientPreVoteTest, for additional KRaft protocol tests with respect to pre-vote.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This pull request replaces Log4j with Log4j2 across the entire project, including dependencies, configurations, and code. The notable changes are listed below:
1. Introduce Log4j2 Instead of Log4j
2. Change Configuration File Format from Properties to YAML
3. Adds warnings to notify users if they are still using Log4j properties, encouraging them to transition to Log4j2 configurations
Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When a struct field is tagged and nullable, it is serialized as
{ varint tag; varint dataLength; nullable data }, where
nullable is serialized as
{ varint isNotNull; if (isNotNull) struct s; }. The length field
includes the is-not-null varint.
This patch fixes a bug in serialization where the written value of
the length field and the value used to compute the size of the length
field differs by 1. In practice this has no impact unless the
serialized length of the struct is 127 bytes, since the varint encodings
of 127 and 128 have different lengths (0x7f vs 0x80 01).
Reviewers: David Jacot <djacot@confluent.io>
mplementation of KIP-1073: Return fenced brokers in DescribeCluster response.
Add new unit and integration tests for describeCluster.
Reviewers: Luke Chen <showuon@gmail.com>
Part of KIP-1106.
Adds support for "by_duration" and "none" reset strategy
to the Kafka Streams runtime.
Reviewers: Bill Bejeck <bill@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Fixed logic in markCoordinatorUnknown to ensure the warning log contains the correct number of milliseconds the client has been disconnected.
Reviewers: Christo Lolov <lolovc@amazon.com>
* Add fields `groupEpoch` and `targetAssignmentEpoch` to `ConsumerGroupDescription.java`.
* Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`.
* Add assertion to `PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member in classic group returns `upgraded` as `Optional.empty`.
* Add new case `testConsumerGroupWithMemberMigration` to `PlaintextAdminIntegrationTest` to make sure migration member has correct `upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`, `memberEpoch` as well.
Reviewers: David Jacot <djacot@confluent.io>
Signed-off-by: PoAn Yang <payang@apache.org>
Improve descriptive information in Kafka protocol documentation.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Removes the client side AddPartitionsToTxn/AddOffsetsToTxn calls so that the partition is implicitly added as part of KIP-890 part 2.
This change also requires updating the valid state transitions. The client side can not know for certain if a partition has been added server side when the request times out (partial completion). Thus for TV2, the transition to PrepareAbort is now valid for Empty, CompleteCommit, and CompleteAbort.
For readability, the V1 and V2 endTransaction methods have been separated.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ritika Reddy <rreddy@confluent.io>
This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds an error message for the described groups so that it is possible to get some context on the error. It also changes the behaviour for when the group ID cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.
Reviewers: David Jacot <djacot@confluent.io>
In the response handler for ShareAcknowledge, we are passing the clientResponse.receivedTimeMs() to the handler methods. But when there is a disconnect or when the response received is null, we should be passing the current time instead.
This bug was causing consumer to hang as it did not call the handler methods on disconnect, and further requests were blocked waiting for its completion.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>