Since KIP-853 has been removed from the 3.8 release, do not allow quorum.bootstrap.servers to be
set outside of JUnit tests. Hide the configuration by making it internal.
Allow the committed offsets fetch to run for as long as needed. This handles the case where a user invokes Consumer.poll() with a very small timeout (including zero).
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When the node transitions from a leader to a follower for a partition, then the tier-lag metrics should be reset to zero. Otherwise, it would lead to false positive in metrics. Addressed the concurrency issue while emitting the metrics.
Reviewers: Satish Duggana <satishd@apache.org>, Francois Visconte <f.visconte@gmail.com>,
The listRemoteLogSegments returns the metadata list sorted by the start-offset. However, the returned metadata list contains all the uploaded segment information including the duplicate and overlapping remote-log-segments. The reason for duplicate/overlapping remote-log-segments cases is explained [here](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java#L103).
The list returned by the RLMM#listRemoteLogSegments can contain the duplicate segment metadata at the end of the list. So, while computing the next log-start-offset we should take the maximum of segments (end-offset + 1).
Reviewers: Satish Duggana <satishd@apache.org>
This commit implements KIP-899: Allow producer and consumer clients to rebootstrap. It introduces the new setting `metadata.recovery.strategy`, applicable to all the types of clients.
Reviewers: Greg Harris <gharris1727@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
KAFKA-16570 FenceProducers API returns "unexpected error" when successful
* Client handling of ConcurrentTransactionsException as retriable
* Unit test
* Integration test
Reviewers: Chris Egerton <chrise@aiven.io>, Justine Olshan <jolshan@confluent.io>
This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
- Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956
- Added unit-tests for the copy throttling logic.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.
To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent
Reviewers: Jun Rao <junrao@gmail.com>
This patch implements the handling of `includeAuthorizedOperations` flag in the ConsumerGroupDescribe API.
Reviewers: David Jacot <djacot@confluent.io>
KAFKA-16606 (#15834) introduced a change that broke
ReassignPartitionsCommandTest.testReassignmentCompletionDuringPartialUpgrade.
The point was to validate that the MetadataVersion supports JBOD
in KRaft when multiple log directories are configured.
We do that by checking the version used in
kafka-features.sh upgrade --metadata, and the version discovered
via a FeatureRecord for metadata.version in the cluster metadata.
There's no point in checking inter.broker.protocol.version in
KafkaConfig, since in KRaft, that configuration is deprecated
and ignored — always assuming the value of MINIMUM_KRAFT_VERSION.
The broken that was broken sets inter.broker.protocol.version in
KRaft mode and configures 3 directories. So alternatively, we
could change the test to not configure this property.
Since the property isn't forbidden in KRaft mode, just ignored,
and operators may forget to remove it, it seems better to remote
the fail condition in KafkaConfig.
Reviewers: Luke Chen <showuon@gmail.com>
Implement the add voter, remove voter, and update voter RPCs for
KIP-853. This is just adding the RPC handling; the current
implementation in RaftManager just throws UnsupportedVersionException.
Reviewers: Andrew Schofield <aschofield@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Conflicts: Fix some conflicts caused by the lack of KIP-932 RPCs in 3.8.
Support for multiple log directories in KRaft exists from
MetataVersion 3.7-IV2.
When migrating a ZK broker to KRaft, we already check that
the IBP is high enough before allowing the broker to startup.
With KIP-584 and KIP-778, Brokers in KRaft mode do not require
the IBP configuration - the configuration is deprecated.
In KRaft mode inter.broker.protocol.version defaults to
MetadataVersion.MINIMUM_KRAFT_VERSION (IBP_3_0_IV1).
Instead KRaft brokers discover the MetadataVersion by reading
the "metadata.version" FeatureLevelRecord from the cluster metadata.
This change adds a new configuration validation step upon discovering
the "metadata.version" from the cluster metadata.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Improve consistency and correctness for user-provided timeouts at the Consumer network request layer, per the Java client Consumer timeouts design (https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts). While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's event layer, this change enforces timeouts at the network request layer.
The changes mostly fit into the following areas:
1. Create shared code and idioms so timeout handling logic is consistent across current and future RequestManager implementations
2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, retryExpirationTimeMs, timeoutMs, etc.
3. Update "preemptive pruning" to remove expired requests that have had at least one attempt
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
When starting up kafka logManager, we'll check stray replicas to avoid some corner cases. But this check might cause broker unable to startup if partition.metadata is missing because when startup kafka, we load log from file, and the topicId of the log is coming from partition.metadata file. So, if partition.metadata is missing, the topicId will be None, and the LogManager#isStrayKraftReplica will fail with no topicID error.
The partition.metadata missing could be some storage failure, or another possible path is unclean shutdown after topic is created in the replica, but before data is flushed into partition.metadata file. This is possible because we do the flush in async way here.
When finding a log without topicID, we should treat it as a stray log and then delete it.
Reviewers: Luke Chen <showuon@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).
This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.
The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.
The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.
The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.
Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.
Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.
Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.
Reviewers: Luke Chen <showuon@gmail.com>
This patch fixes a typo in MetadataVersion.IBP_4_0_IV0. It should be 0 not O.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.
This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code.
Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <soarez@apple.com>
This patch introduces the `group.version` feature flag with one version:
1) Version 1 enables the new consumer group rebalance protocol (KIP-848).
Reviewers: Justine Olshan <jolshan@confluent.io>
add more unit tests to LogSegments and do some small refactor in LogSegments.java
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions.
Features is also updated to return latest production and latest testing versions of each feature.
A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready.
Adds tests for the feature usage of the unstableFeatureVersionsEnabled config
Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
While working on https://github.com/apache/kafka/pull/16120, I noticed that the transaction verification feature is disabled in `TransactionsTest` when the new group coordinator is enabled. We did this initially because the feature was not available in the new group coordinator but we fixed it a long time ago. We can enable it now.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch exposes the group coordinator config `CONSUMER_GROUP_MIGRATION_POLICY_CONFIG`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This PR does following things
System tests should bring up Kafka broker in the native mode
System tests should run on Kafka broker in native mode
Extract out native build command so that it can be reused.
Allow system tests to run on Native Kafka broker using Docker mechanism
To run system tests by bringing up Kafka in native mode:
Pass kafka_mode as native in the ducktape globals:--globals '{\"kafka_mode\":\"native\"}'
Running system tests by bringing up kafka in native mode via docker mechanism
_DUCKTAPE_OPTIONS="--globals '{\"kafka_mode\":\"native\"}'" TC_PATHS="tests/kafkatest/tests/" bash tests/docker/run_tests.sh
To only bring up ducker nodes to cater native kafka
bash tests/docker/ducker-ak up -m native
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Jun Rao <junrao@gmail.com>
This patch disallows enabling the new consumer rebalance protocol in ZK mode.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
n BrokerServer.scala, brokerMetadataPublishers are configured and when there are metadata updates remoteLogManager is not configured by then.
Ex : remoteLogManager.foreach(rlm => rlm.onLeadershipChange(partitionsBecomeLeader.asJava, partitionsBecomeFollower.asJava, topicIds)) in ReplicaManager is invoked after publishers are instantiated, and here rlm has relevant managers configured.
This change makes sure rlm is configured before the brokerMetadataPublishers initialization.
Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Ramakrishnan <nikrmk@amazon.com>
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.
I've also added the --feature flag to the storage tool to show how it will be used.
Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.
I will add the unstable config and tests in a followup.
Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.
This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.
Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.
Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io>