This patch adds the support for filtering groups by types (Classic or Consumer) to both the old and the new group coordinators.
Reviewers: David Jacot <djacot@confluent.io>
In Kraft mode, the broker fails to handle topic recreation correctly with broken disks. This is because ReplicaManager tracks HostedPartitions which are on an offline disk but it doesn't associate TopicId information with them.
This change updates HostedPartition.Offline to associate topic id information. We also update the log creation logic in Partition::createLogInAssignedDirectoryId to not just rely on targetLogDirectoryId == DirectoryId.UNASSIGNED to determine if the log to be created is "new".
Please refer to the comments in https://issues.apache.org/jira/browse/KAFKA-16157 for more information.
Reviewers: Luke Chen <showuon@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
While migrating from ZK mode to KRaft mode, the broker passes through a "hybrid" phase, in which it
receives LeaderAndIsrRequest and UpdateMetadataRequest RPCs from the KRaft controller. For the most
part, these RPCs can be handled just like their traditional equivalents from a ZK-based controller.
However, there is one thing that is different: the way topic deletions are handled.
In ZK mode, there is a "deleting" state which topics enter prior to being completely removed.
Partitions stay in this state until they are removed from the disks of all replicas. And partitions
associated with these deleting topics show up in the UMR and LAIR as having a leader of -2 (which
is not a valid broker ID, of course, because it's negative). When brokers receive these RPCs, they
know to remove the associated partitions from their metadata caches, and disks. When a full UMR or
ISR is sent, deleting partitions are included as well.
In hybrid mode, in contrast, there is no "deleting" state. Topic deletion happens immediately. We
can do this because we know that we have topic IDs that are never reused. This means that we can
always tell the difference between a broker that had an old version of some topic, and a broker
that has a new version that was re-created with the same name. To make this work, when handling a
full UMR or LAIR, hybrid brokers must compare the full state that was sent over the wire to their
own local state, and adjust accordingly.
Prior to this PR, the code for handling those adjustments had several major flaws. The biggest flaw
is that it did not correctly handle the "re-creation" case where a topic named FOO appears in the
RPC, but with a different ID than the broker's local FOO. Another flaw is that a problem with a
single partition would prevent handling the whole request.
In ZkMetadataCache.scala, we handle full UMR requests from KRaft controllers by rewriting the UMR
so that it contains the implied deletions. I fixed this code so that deletions always appear at the
start of the list of topic states. This is important for the re-creation case since it means that a
single request can both delete the old FOO and add a new FOO to the cache. Also, rather than
modifying the requesst in-place, as the previous code did, I build a whole new request with the
desired list of topic states. This is much safer because it avoids unforseen interactions with
other parts of the code that deal with requests (like request logging). While this new copy may
sound expensive, it should actually not be. We are doing a "shallow copy" which references the
previous list topic state entries.
I also reworked ZkMetadataCache.updateMetadata so that if a partition is re-created, it does not
appear in the returned set of deleted TopicPartitions. Since this set is used only by the group
manager, this seemed appropriate. (If I was in the consumer group for the previous iteration of
FOO, I should still be in the consumer group for the new iteration.)
On the ReplicaManager.scala side, we handle full LAIR requests by treating anything which does not
appear in them as a "stray replica." (But we do not rewrite the request objects as we do with UMR.)
I moved the logic for finding stray replicas from ReplicaManager into LogManager. It makes more
sense there, since the information about what is on-disk is managed in LogManager. Also, the stray
replica detection logic for KRaft mode is there, so it makes sense to put the stray replica
detection logic for hybrid mode there as well.
Since the stray replica detection is now in LogManager, I moved the unit tests there as well.
Previously some of those tests had been in BrokerMetadataPublisherTest for historical reasons.
The main advantage of the new LAIR logic is that it takes topic ID into account. A replica can be a
stray even if the LAIR contains a topic of the given name, but a different ID. I also moved the
stray replica handling earlier in the becomeLeaderOrFollower function, so that we could correctly
handle the "delete and re-create FOO" case.
Reviewers: David Arthur <mumrah@gmail.com>
In KRaft mode, or on ZK brokers that are migrating to KRaft, we have a local __cluster_metadata
log. This log is stored in a single log directory which is configured via metadata.log.dir. If
there is no metadata.log.dir given, it defaults to the first entry in log.dirs. In the future we
may support multiple metadata log directories, but we don't yet. For now, we must abort the
process when this log directory fails.
In ZK mode, it is not necessary to abort the process when this directory fails, since there is no
__cluster_metadata log there. This PR changes the logic so that we check for whether we're in ZK
mode and do not abort in that scenario (unless we lost the final remaining log directory. of
course.)
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Proven Provenzano <pprovenzano@confluent.io>
During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the
data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but
200 seems better. This PR also adds an internal configuration to control this batch size, for
testing purposes.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch extends the Admin client to support describing new consumer groups with the ConsumerGroupDescribe API introduced in KIP-848. Users will continue to use the `Admin#describeConsumerGroups` API. The admin client does all the magic. Basically, the admin client always tries to describe the requested groups with the ConsumerGroupDescribe API to start with. If all the groups are there, great, the job is done. If there are groups unresolved groups due to a UNSUPPORTED_VERSION or GROUP_ID_NOT_FOUND error, the admin client tries with the DescribeGroups API. The patch also adds fields to the data structure returned by `Admin#describeConsumerGroups` as stated in the KIP.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Bruno Cadonna <bruno@confluent.io>
* MINOR: Clean up core server classes
Mark methods and fields private where possible
Annotate public methods and fields
Remove unused classes and methods
Make sure Arrays are not printed with .toString
Optimize minor warnings
Remove unused apply method
Signed-off-by: Josep Prat <josep.prat@aiven.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This patch removes the extra hop via the request thread when the new group coordinator verifies a transaction. Prior to it, the ReplicaManager would automatically re-schedule the callback to a request thread. However, the new group coordinator does not need this as it already schedules the write into its own thread. With this patch, the decision to re-schedule on a request thread or not is left to the caller.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
We update metadata update handler to resend broker registration when
metadata has been updated to >= 3.7IV2 so that the controller becomes
aware of the log directories in the broker.
We also update DirectoryId::isOnline to return true on an empty list of
log directories while the controller awaits broker registration.
Co-authored-by: Proven Provenzano <pprovenzano@confluent.io>
Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Because of lack of implicit conversions, boolean properties need to be
passed as Strings
This is done in other parts of the code already
Signed-off-by: Josep Prat <josep.prat@aiven.io>
When a broker is down, and a topic is deleted, this will result in that broker seeing "stray
replicas" the next time it starts up. These replicas contain data that used to be important, but
which now needs to be deleted. Stray replica deletion is handled during the initial metadata
publishing step on the broker.
Previously, we deleted these stray replicas after starting up BOTH LogManager and ReplicaManager.
However, this wasn't quite correct. The presence of the stray replicas confused ReplicaManager.
Instead, we should delete the stray replicas BEFORE starting ReplicaManager.
This bug triggered when a topic was deleted and re-created while a broker was down, and some of the
replicas of the re-created topic landed on that broker. The impact was that the stray replicas were
deleted, but the new replicas for the next iteration of the topic never got created. This, in turn,
led to persistent under-replication until the next time the broker was restarted.
Reviewers: Luke Chen <showuon@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
This patch causes the active KRaftMigrationDriver to reload the /migration ZK state after electing
itself as the leader in ZK. This closes a race condition where the previous active controller could
make an update to /migration after the new leader was elected. The update race was not actually a
problem regarding the data since both controllers would be syncing the same state from KRaft to ZK,
but the change to the znode causes the new controller to fail on the zk version check on
/migration.
This patch also fixes a as-yet-unseen bug where the active controllers failing to elect itself via
claimControllerLeadership would not retry.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
We need to make sure to call the consumer interceptor and test its integration.
This is adding the required call in commitSync and commitAsync. The calls in commitAsync are executed using the same mechanism as commit callbacks, to ensure that we are calling the interceptors from a single thread, as is intended in the original KIP.
The interceptors also need to be invoked on auto-commits which are executed in the commit request manager. For this purpose, we share the OffsetCommitCallbackInvoker class with the background thread (it is already accessed implicitly from the background thread through a future lambda). This is done analogous to the RebalanceListenerInvoker.
Co-authored-by: John Doe zh2725284321@gmail.com
Reviewers: Bruno Cadonna <bruno@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Philip Nee <pnee@confluent.io>
While using —list —state the current accepted values correspond to the classic group type states. This patch adds the new states introduced by KIP-848. It also make the matching on the server case insensitive.
Co-authored-by: d00791190 <dinglan6@huawei.com>
Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
In BrokerTopicMetrics group, we'll provide not only the metric for per topic, but also the all topic aggregated metric value. The beanName is like this:
kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments
kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments,topic=Leader
This PR is to add the missing all topic aggregated metric value for tiered storage, specifically for gauge type metrics.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
I was using the ZERO_UUID topicId instead of the actual topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower introduced in #14444, updating as the actual topicId is more correct.
Reviewers: Justine Olshan <jolshan@confluent.io>
This PR is part of #14471
Is contains single test rewritten in java.
Intention of separate PR is to reduce changes and simplify review.
Reviewers: Justine Olshan <jolshan@confluent.io>
I originally did some refactors in #14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:
* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
* MINOR: Clean up core api, cluster, common, log, admin, controller and coordinator classes
Mark methods and fields private where possible
Annotate public methods and fields
Remove unused classes and methods
Signed-off-by: Josep Prat <josep.prat@aiven.io>
Reviewers: Luke Chen <showuon@gmail.com>
After this #13107 PR, an if-else block became unreachable. We need remove it and make the code clean.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>
This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC.
Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This ensures that no records are fetched, or positions initialized, while the onPartitionsAssigned callback completes in the new async consumer Application thread. This is achieved using an internal mark in the subscription state, so that the partitions are not considered fetchable or requiring initializing positions until the callback completes.
Reviewers: David Jacot <djacot@confluent.io>
KIP-714 requires client instance cache in broker which should also have a time-based eviction policy where client instances which are not actively sending metrics should be evicted. KIP mentions This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
This originally was #14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.
I've cut out the second part of the change since the first part is much simpler.
Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)
Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk> , David Jacot <djacot@confluent.io>, Nikolay <NIzhikov@gmail.com>
PR adds support to capture client socket port information in Request Context. The port from request context is used as matching criteria in filtering clients and shall be used by metrics plugin to fetch port from request context.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
We reevaluated the integration tests that were disabled for the new consumer group protocol which should be supported. The evaluation was to run the PlaintextConsumerTest suite ten times and see which tests passed and which failed.
Based on that evaluation, the following test can now be enabled:
testAutoCommitOnClose
testAutoCommitOnRebalance
testExpandingTopicSubscriptions
testMultiConsumerSessionTimeoutOnClose
testMultiConsumerSessionTimeoutOnStopPolling
testShrinkingTopicSubscriptions
There are three tests which consistently failed. For each, a dedicated Jira was created to track and fix. Those that failed:
testAutoCommitOnCloseAfterWakeup (KAFKA-16167)
testPerPartitionLagMetricsCleanUpWithSubscribe (local failure rate 100%, KAFKA-16150)
testPerPartitionLeadMetricsCleanUpWithSubscribe (local failure rate: 70%, KAFKA-16151)
testStaticConsumerDetectsNewPartitionCreatedAfterRestart (local failure rate: 100%, KAFKA-16152)
See KAFKA-16104 for more details.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The current load summary exposes the time from when the partition load operation is scheduled to when the load completes. We are missing the information of how long the scheduled operation stays in the scheduler. Log that information.
Reviewers: David Jacot <djacot@confluent.io>
This PR adds a test case for follower fetch when segments are archived and expired from remote storage. This test case verifies the following scenario (from comment):
1. Leader is archiving to tiered storage and has a follower.
2. Follower has caught up to offset X (exclusive).
3. While follower is offline, leader moves X to tiered storage and expires data locally till Y, such that, Y = leaderLocalLogStartOffset and leaderLocalLogStartOffset > X. Meanwhile, X has been expired from tiered storage as well. Hence, X < globalLogStartOffset.
4. Follower comes online and tries to fetch X from leader.
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
Some kcontroller dynamic configurations may fail to apply at startup. This happens because there is
a race between registering the reconfigurables to the DynamicBrokerConfig class, and receiving the
first update from the metadata publisher. We can fix this by registering the reconfigurables first.
This seems to have been introduced by the "MINOR: Install ControllerServer metadata publishers
sooner" change.
Reviewers: Ron Dagostino <rdagostino@confluent.io>
This fixes an issue with the time boundaries used for the auto-commit performed when partitions are revoked.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>