ShareGroupStateMessageFormatter should extend
CoordinatorRecordMessageFormatter in order to have a consistent handling
of records of coordinators.
Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>
This is a small follow-up of https://github.com/apache/kafka/pull/19290.
The `actionQueue` argument is only used by the
`CoordinatorPartitionWriter` so we can remove it from the other methods
now.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
[JIRA](https://issues.apache.org/jira/browse/KAFKA-18959)
This PR adjusts the default node count for system tests since some tests
are using more than 9 nodes.
This change ensures such test cases can be run successfully without
manual configuration.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch moves `ConsumerTopicCreationTest` to the
`client-integration-tests` and rewrite it as Java.
The patch also streamlines the test flow.
In the Scala version, there is a producer that produces messages, but
this is not the main purpose of the `ConsumerTopicCreationTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The `window.size.ms` and `window.inner.class.serde` are not a true
KafkaStreams config, and are ignored when set from a KStreams
application. Both belong on the client.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Signed-off-by: PoAn Yang <payang@apache.org>
Basic streams group heartbeat handling. The main part of are the unit
tests that make sure that we behave, for the most part, like a consumer
group.
- No support for static membership
- No support for configurations (using constants instead)
- No support for regular expressions
Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna
<cadonna@apache.org>
Enhance the documentation for Admin#describeCluster and
Admin#describeConfigs to clarify their behavior when using
bootstrap.controllers and bootstrap.servers.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The zookeeper mode was removed in 4.0. The test cases don't need to
specify quorum. Following variable and functions can be replaced:
- TestWithParameterizedQuorumAndGroupProtocolNames
- getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly
- getTestQuorumAndGroupProtocolParametersAll
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch addresses a weirdness on the GroupCoordinator write path. The
`CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords`
method with `acks=1` and it expects it to completes
immediately/synchronously. It works because this is effectively what the
method does with `acks=1`. The issue is that fundamentally the method is
asynchronous so the contract is really fragile. This patch changes it by
introducing new method `ReplicaManager.appendRecordsToLeader`, which is
synchronous. It also refactors `ReplicaManager#appendRecords` to use
`ReplicaManager.appendRecordsToLeader` so we can benefits from all the
existing tests.
Reviewers: Fred Zheng <fzheng@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This patch applies a few cleanups to uniformize how group coordinator's
integration tests are setup.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Move LogCleanerManager and related classes to storage module and rewrite
in Java.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The generated request data type's constructors take Readable as an input. However, the parse method in the
AbstractRequest takes a ByteBuffer as input. So to create the corresponding request data objects, each individual
concrete Request classes wraps the ByteBuffer into a ByteBufferAccessor.
This is boilerplate code present in all the concrete request classes. This changes AbstractRequest's parse method so that subclasses can simply pass the `Readable` they get directly to request data classes.
The same change is made to the serialize method to maintain symmetry.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio
<jsancio@apache.org>, Artem Livshits <alivshits@confluent.io>,
Truc Nguyen <trnguyen@confluent.io>
This patch refactors the `ApiMessageFormatter` to follow what we have
done in https://github.com/apache/kafka/pull/18688.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Previously, the `ShareConsumer.commitAsync()` method retried sending
`ShareAcknowledge` requests indefinitely. Now it will instead use the
defaultApiTimeout config to expire the request so that it does not retry forever.
PR also fixes a bug in processing `commitSync() `requests, where we
need an additional check if the node is free.
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Reviewers: Andrew Schofield <aschofield@confluent.io>
Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.
The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.
Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR fixes a potential issue where the `FetchResponse` returns
`divergingEndOffsets` with an older leader epoch. This can lead to
committed records being removed from the follower's log, potentially
causing data loss.
In detail:
`processFetchRequest` gets the requested leader epoch of partition data
by `topicPartition` and compares it with the leader epoch of the current
fetch state. If they don't match, the response is ignored.
Reviewers: Jun Rao <junrao@gmail.com>
Add KIP-1104 link to stream upgrade guide.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>
This commit adds error handling to the Streams heartbeat request
manager.
Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.
Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This patch fixes another bug in the FindCoordinator API handling for
share partition key. `shareCoordinator.foreach` returns `Unit` so
`shareCoordinator.foreach(coordinator =>
coordinator.partitionFor(SharePartitionKey.getInstance(key)))` does not
return the partition for the key.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Given that now we support Java 17 on our brokers, this PR replace the
use of the following in server-common module:
- Collections.singletonList() and Collections.emptyList() with List.of()
- Collections.singletonMap() and Collections.emptyMap() with Map.of()
- Collections.singleton() and Collections.emptySet() with Set.of()
- Arrays.asList() with List.of()
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
JIRA: KAFKA-18576
After removing ZooKeeper, we no longer need to exclude `client_metrics`
and `group` from `ConfigType#ALL`.
Since it's a common pattern to provide a mechanism to know all values in
enumeration ( Java enum provides ootb), we should convert ConfigType to
enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
At the moment, the FindCoordinator API returns an `UNKNOWN_SERVER_ERROR`
error when the share partition key is invalid. It seems that the aim was
to return an `INVALID_REQUEST` error but the code has a small bug
preventing it from working as expected.
Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This PR adds extra information in assertion failed messages for tests in
SharePartitionTest revolving around acquisition lock timeouts
functionality.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Minor improvement on the error output for the storage format command. Suggests changes for a valid storage format command.
Reviewers: Justine Olshan <jolshan@confluent.io>
In past, we have `AbstractConfig#preProcessParsedConfig` but did not use
its return value
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The PR changed three things.
* Migrated `ProducerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
Default `ProducerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the producer with enabled rebootstrap
<img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60"
/>
The test case for the producer with disabled rebootstrap
<img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728"
/>
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
jira: https://issues.apache.org/jira/browse/KAFKA-18980
We should use the number of `records` rather than topic partitions,but
this is not a bug as the number of `records` should be equal to the
number of topic partitions.
Also, we previously used `expiredPartitions` for logging, but now it is
not being used anymore, so we can remove it.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
`clients-integration-tests` modules doesn't have the `log4j2.yaml` to
setting log, thus we should add.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Some of these classes are generally useful for testing.
MockCoordinatorShard is already shared by SnapshottableCoordinatorTest.
Also do some minor refactors.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This patch is the third of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
It removes the old group coordinator and cleans up the
`GroupCoordinator` interface.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
ReplicaSelector implementations can implement Monitorable to register their own metrics.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang <s7133700@gmail.com>
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>