The usual flow of updating the upgrade.html docs is to first do it in apache/kafka/trunk, then cherry-pick to the relative release branch and then copy into the kafka-site repo.
It seems like this was not done with a few commits updating the 3.6.1, 3.5.2 and 3.5.1, resulting in kafka-site's latest upgrade.html containing content that isn't here. This was caught while we were adding the 3.7 upgrade docs.
This patch reconciles both files by taking the extra changes from kafka-site and placing them here. This was done by simply comparing a diff of both changes and taking the ones that apply
This trivial PR makes clear when it's the right time to switch from AclAuthorizer to StandardAuthorizer during the migration process.
Reviewers: Luke Chen <showuon@gmail.com>
During ZK migrating to KRaft, before entering dual-write mode, the KRaft controller will send RPCs (i.e. UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest) to the brokers. Currently, we use the inter broker listener to send the RPC to brokers from the controller. But in the doc, we didn't provide this info to users because the normal KRaft controller won't use inter.broker.listener.names.
This PR adds the missing config in the ZK migrating to KRaft doc.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Paolo Patierno <ppatierno@live.com>
The base eclipse-temuring:21-jre-alpine image got modified and had `bash` removed from it. This broke our build, since downstream steps utilizing bash scripts depended on it. This patch explicitly installs bash
Kafka Streams should not crash if a task is closed dirty. This is a
hotfix to catch/swallow an IllegalStateException from
`producer.abortTrandsaction()` on the close-dirty clean-up path.
A proper fix would be to not call `abortTransaction()` for this
particular case.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In Kraft mode, the broker fails to handle topic recreation correctly with broken disks. This is because ReplicaManager tracks HostedPartitions which are on an offline disk but it doesn't associate TopicId information with them.
This change updates HostedPartition.Offline to associate topic id information. We also update the log creation logic in Partition::createLogInAssignedDirectoryId to not just rely on targetLogDirectoryId == DirectoryId.UNASSIGNED to determine if the log to be created is "new".
Please refer to the comments in https://issues.apache.org/jira/browse/KAFKA-16157 for more information.
Reviewers: Luke Chen <showuon@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
While migrating from ZK mode to KRaft mode, the broker passes through a "hybrid" phase, in which it
receives LeaderAndIsrRequest and UpdateMetadataRequest RPCs from the KRaft controller. For the most
part, these RPCs can be handled just like their traditional equivalents from a ZK-based controller.
However, there is one thing that is different: the way topic deletions are handled.
In ZK mode, there is a "deleting" state which topics enter prior to being completely removed.
Partitions stay in this state until they are removed from the disks of all replicas. And partitions
associated with these deleting topics show up in the UMR and LAIR as having a leader of -2 (which
is not a valid broker ID, of course, because it's negative). When brokers receive these RPCs, they
know to remove the associated partitions from their metadata caches, and disks. When a full UMR or
ISR is sent, deleting partitions are included as well.
In hybrid mode, in contrast, there is no "deleting" state. Topic deletion happens immediately. We
can do this because we know that we have topic IDs that are never reused. This means that we can
always tell the difference between a broker that had an old version of some topic, and a broker
that has a new version that was re-created with the same name. To make this work, when handling a
full UMR or LAIR, hybrid brokers must compare the full state that was sent over the wire to their
own local state, and adjust accordingly.
Prior to this PR, the code for handling those adjustments had several major flaws. The biggest flaw
is that it did not correctly handle the "re-creation" case where a topic named FOO appears in the
RPC, but with a different ID than the broker's local FOO. Another flaw is that a problem with a
single partition would prevent handling the whole request.
In ZkMetadataCache.scala, we handle full UMR requests from KRaft controllers by rewriting the UMR
so that it contains the implied deletions. I fixed this code so that deletions always appear at the
start of the list of topic states. This is important for the re-creation case since it means that a
single request can both delete the old FOO and add a new FOO to the cache. Also, rather than
modifying the requesst in-place, as the previous code did, I build a whole new request with the
desired list of topic states. This is much safer because it avoids unforseen interactions with
other parts of the code that deal with requests (like request logging). While this new copy may
sound expensive, it should actually not be. We are doing a "shallow copy" which references the
previous list topic state entries.
I also reworked ZkMetadataCache.updateMetadata so that if a partition is re-created, it does not
appear in the returned set of deleted TopicPartitions. Since this set is used only by the group
manager, this seemed appropriate. (If I was in the consumer group for the previous iteration of
FOO, I should still be in the consumer group for the new iteration.)
On the ReplicaManager.scala side, we handle full LAIR requests by treating anything which does not
appear in them as a "stray replica." (But we do not rewrite the request objects as we do with UMR.)
I moved the logic for finding stray replicas from ReplicaManager into LogManager. It makes more
sense there, since the information about what is on-disk is managed in LogManager. Also, the stray
replica detection logic for KRaft mode is there, so it makes sense to put the stray replica
detection logic for hybrid mode there as well.
Since the stray replica detection is now in LogManager, I moved the unit tests there as well.
Previously some of those tests had been in BrokerMetadataPublisherTest for historical reasons.
The main advantage of the new LAIR logic is that it takes topic ID into account. A replica can be a
stray even if the LAIR contains a topic of the given name, but a different ID. I also moved the
stray replica handling earlier in the becomeLeaderOrFollower function, so that we could correctly
handle the "delete and re-create FOO" case.
Reviewers: David Arthur <mumrah@gmail.com>
In KRaft mode, or on ZK brokers that are migrating to KRaft, we have a local __cluster_metadata
log. This log is stored in a single log directory which is configured via metadata.log.dir. If
there is no metadata.log.dir given, it defaults to the first entry in log.dirs. In the future we
may support multiple metadata log directories, but we don't yet. For now, we must abort the
process when this log directory fails.
In ZK mode, it is not necessary to abort the process when this directory fails, since there is no
__cluster_metadata log there. This PR changes the logic so that we check for whether we're in ZK
mode and do not abort in that scenario (unless we lost the final remaining log directory. of
course.)
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Proven Provenzano <pprovenzano@confluent.io>
During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the
data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but
200 seems better. This PR also adds an internal configuration to control this batch size, for
testing purposes.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR fixes a couple of things related to the #15193 PR.
When you complete "Enter Migration Mode on the brokers", we are actually in "Enabling the migration on the brokers" referring to the migration guide and the broker doesn't really have node.id yet but still broker.id, so the PR removes a statement saying to replace the one with the other.
Also, during rollback it's not enough just deleting the /controller znode quickly after shutting down controllers because the controller election doesn't start yet until at least one broker is rolled back with the right configuration. Until the rolling and when controllers are down, the brokers just log something like this even if you deleted the znode "quickly":
[2024-01-30 09:27:52,394] DEBUG [zk-broker-0-to-controller-quorum-channel-manager]: Controller isn't cached, looking for local metadata changes (kafka.server.BrokerToControllerRequestThread)
[2024-01-30 09:27:52,394] INFO [zk-broker-0-to-controller-quorum-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
You have to reduce the amount of time between deleting the znode and rolling at least one broker, so that an election can start.
Reviewers: Luke Chen <showuon@gmail.com>
We update metadata update handler to resend broker registration when
metadata has been updated to >= 3.7IV2 so that the controller becomes
aware of the log directories in the broker.
We also update DirectoryId::isOnline to return true on an empty list of
log directories while the controller awaits broker registration.
Co-authored-by: Proven Provenzano <pprovenzano@confluent.io>
Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
When a broker is down, and a topic is deleted, this will result in that broker seeing "stray
replicas" the next time it starts up. These replicas contain data that used to be important, but
which now needs to be deleted. Stray replica deletion is handled during the initial metadata
publishing step on the broker.
Previously, we deleted these stray replicas after starting up BOTH LogManager and ReplicaManager.
However, this wasn't quite correct. The presence of the stray replicas confused ReplicaManager.
Instead, we should delete the stray replicas BEFORE starting ReplicaManager.
This bug triggered when a topic was deleted and re-created while a broker was down, and some of the
replicas of the re-created topic landed on that broker. The impact was that the stray replicas were
deleted, but the new replicas for the next iteration of the topic never got created. This, in turn,
led to persistent under-replication until the next time the broker was restarted.
Reviewers: Luke Chen <showuon@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
This PR fixes some bugs in the KRaft migration documentation and reorganizes it to be easier to read. (Specifically, there were some steps that were previously out of order.)
In order to keep it all straight, the revert documentation is now in the form of a table which maps the latest migration state to the actions which the system administrator should perform.
Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>, Liu Zeyu <zeyu.luke@gmail.com>, Paolo Patierno <ppatierno@live.com>
This patch causes the active KRaftMigrationDriver to reload the /migration ZK state after electing
itself as the leader in ZK. This closes a race condition where the previous active controller could
make an update to /migration after the new leader was elected. The update race was not actually a
problem regarding the data since both controllers would be syncing the same state from KRaft to ZK,
but the change to the znode causes the new controller to fail on the zk version check on
/migration.
This patch also fixes a as-yet-unseen bug where the active controllers failing to elect itself via
claimControllerLeadership would not retry.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
In BrokerTopicMetrics group, we'll provide not only the metric for per topic, but also the all topic aggregated metric value. The beanName is like this:
kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments
kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments,topic=Leader
This PR is to add the missing all topic aggregated metric value for tiered storage, specifically for gauge type metrics.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`.
During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
We've released the fix so I updated the note. We can backport to 3.6 and 3.7 branches as well.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>
When there's only 1 voter, there will be no fetch request from other voters. In this case, we should still not expire the checkQuorum timer because there's just 1 voter.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, José Armando García Sancio <jsancio@apache.org>
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.
This PR reverts this change for now.
Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Removed debug log as next time to update runs in poll loop and excessive logging happens.
Reviewers: Qichao Chu <qichao@uber.com>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
Some kcontroller dynamic configurations may fail to apply at startup. This happens because there is
a race between registering the reconfigurables to the DynamicBrokerConfig class, and receiving the
first update from the metadata publisher. We can fix this by registering the reconfigurables first.
This seems to have been introduced by the "MINOR: Install ControllerServer metadata publishers
sooner" change.
Reviewers: Ron Dagostino <rdagostino@confluent.io>
When a replica is deleted, the unloading procedure of the coordinator is called with an empty leader epoch. However, the current implementation of the new group coordinator throws an exception in this case. My bad. This patch updates the logic to handle it correctly.
We discovered the bug in our testing environment. We will add a system test or an integration test in a subsequent patch to better exercise this path.
Reviewers: Justine Olshan <jolshan@confluent.io>
This fixes an issue with the time boundaries used for the auto-commit performed when partitions are revoked.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
When we are migrating from ZK mode to KRaft mode, the brokers pass through a phase where they are
running in ZK mode, but the controller is in KRaft mode (aka a kcontroller). This is called "hybrid
mode." In hybrid mode, the KRaft controllers send old-style controller RPCs to the remaining ZK
mode brokers. (StopReplicaRequest, LeaderAndIsrRequest, UpdateMetadataRequest, etc.)
To complete partition reassignment, the kcontroller must send a StopReplicaRequest to any brokers
that no longer host the partition in question. Previously, it was sending this StopReplicaRequest
with delete = false. This led to stray partitions, because the partition data was never removed as
it should have been. This PR fixes it to set delete = true. This fixes KAFKA-16120.
There is one additional problem with partition reassignment in hybrid mode, tracked as KAFKA-16121.
The issue is that in ZK mode, brokers ignore any LeaderAndIsr request where the partition leader
epoch is less than or equal to the current partition leader epoch. However, when in hybrid mode,
just as in KRaft mode, we do not bump the leader epoch when starting a new reassignment, see:
`triggerLeaderEpochBumpIfNeeded`. This PR resolves this problem by adding a special case on the
broker side when isKRaftController = true.
Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Colin P. McCabe <cmccabe@apache.org>
Following @dajac 's finding in #15063, I found we also create new RemoteLogManager in ReplicaManagerTest, but didn't close them.
While investigating ReplicaManagerTest, I also found there are other threads leaking:
1. remote fetch reaper thread. It's because we create a reaper thread in test, which is not expected. We should create a mocked one like other purgatory instance.
2. Throttle threads. We created a quotaManager to feed into the replicaManager, but didn't close it. Actually, we have created a global quotaManager instance and will close it on AfterEach. We should re-use it.
3. replicaManager and logManager didn't invoke close after test.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Satish Duggana <satishd@apache.org>, Justine Olshan <jolshan@confluent.io>
3.7 brokers must be able to register with 3.6 and earlier controllers. Currently, this is broken
because we will unconditionally try to set logDirs, but this field cannot be sent with
BrokerRegistrationRequest versions older than v2. This PR marks the logDirs field as "ignorable."
Marking the field as "ignorable" means that we will still be able to send the
BrokerRegistrationRequest even if the schema doesn't support logDirs.
Reviewers: Ron Dagostino <rdagostino@confluent.io>
Remove "group-rebalance-rate" and "group-rebalance-count" metrics from the new coordinator as this is not part of KIP-848.
Reviewers: David Jacot <djacot@confluent.io>
Migrates functionality provided by utility to Kafka core. This wrapper will be used to generate property files and format storage when invoked from docker container.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The PR fixes the publishing of kafka-clients artifact to remote maven. The kafka-clients jar was recently shadowed which would publish the artifacts to the local maven repo successfully but would throw an error when publishing to remote maven. (as part of the release process)
The issue triggers only with publishMavenJavaPublicationToMavenRepository due to signing. Generating signed asc files error out for shadowed release artifacts as the module name (clients) differs from the artifact name (kafka-clients).
The fix is basically to explicitly define artifact of shadowJar to signing and publish plugin. project.shadow.component(mavenJava) previously outputs the name as client-<version>-all.jar though the classifier and archivesBaseName are already defined correctly in :clients and shadowJar construction.
* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated
There is a race condition in the state updater that can cause the following:
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized
This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.
To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.
We should not remove the task twice, otherwise, we'll end up in this situation
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException
Reviewers: Bruno Cadonna <cadonna@apache.org>