This PR implements the second part of KIP-996 and KAFKA-16164 (tasks KAFKA-16607, KAFKA-17642, KAFKA-17643, KAFKA-17675) which encompass the response handling of PreVotes, addition of new ProspectiveState, update to metrics, and addition of Raft simulation tests.
Voters now transition to ProspectiveState first before CandidateState to prevent unnecessary epoch bumps. Voters in ProspectiveState send PreVotes requests which are Vote requests with PreVote set to true.
Follower grants PreVotes if it has not yet fetched successfully from leader. Leader denies all PreVotes. Unattached, Prospective, Candidate, and Resigned will grant PreVotes if the requesting replica's log is at least as long as theirs. Granted PreVotes are not persisted like standard votes. It is possible for a voter to grant several PreVotes in the same epoch.
The only state which is allowed to transition directly to CandidateState is ProspectiveState. This happens on reception of majority of granted PreVotes or if at least one voter doesn't support PreVote requests.
Prospective will transition to Follower after election loss/timeout if it was already aware of last known leader and the leader's endpoint, or at any point if it discovers the leader.
Prospective will transition to Unattached after election loss/timeout if it does not know the leader endpoints.
After electionTimeout, Resigned now always transitions to Unattached and increases the epoch.
Prospective grants standard votes if it has not already granted a standard vote (no votedKey), has no leaderId, and the recipient's log is current enough
Candidate no longer backs off after election timeout. Candidate still backs off after election loss.
Reviewers: José Armando García Sancio <jsancio@apache.org>
As per the discussion with @ijuma and @mumrah, the `share` module seems not required and it's advised to user `server` and `server-common` instead. The PR moves the classes from `share` module to respective server related modules.
Following has been refactored in the PR:
- Moved Share Fetch, Acknowledge, Session, Context and Cache related classes to `server` module as the classes are used by `core` and `tools` modules.
- Moved `Persister` releated classes from `share` to `server-common` as the Persister classes though currently just being used by `core` module but in [near future](https://github.com/apache/kafka/pull/17775) will also be used by `group-coordinator`. Hence the Persister classes shouldn't go in `server`. The debate is mostly between `coordinator-common` vs `server-common`. We have kept the Persister in `server-common` for now, the classes are more related to the server than the coordinator. Persister is basically an abstraction in the server to let you choose how you want to persist the share group progress.
- Updated build.gradle to remove `share` module.
- Removed `import-control-share.xml`
Reviewers: Ismael Juma <ismael@juma.me.uk>
Remove Apache ZooKeeper from the Apache Kafka build. Also remove commons IO, commons CLI, and netty, which were dependencies we took only because of ZooKeeper.
In order to keep the size of this PR manageable, I did not remove all classes which formerly interfaced with ZK. I just removed the ZK types. Fortunately, Kafka generally wrapped ZK data structures rather than using them directly.
Some classes were pretty entangled with ZK, so it was easier just to stub them out. For ZkNodeChangeNotificationListener.scala, PartitionStateMachine.scala, ReplicaStateMachine.scala, KafkaZkClient.scala, and ZookeeperClient.scala, I replaced all the functions with "throw new UnsupportedOperationException". Since the tests for these classes have been removed, as well as the ZK-based broker code, this should be OK as an incremental step.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
If ELR is enabled, we need to set a cluster-level min.insync.replicas, and remove all broker-level overrides. The reason for this is that if brokers disagree about which partitions are under min ISR, it breaks the KIP-966 replication invariants. In order to enforce this, when the eligible.leader.replicas.version feature is turned on, we automatically remove all broker-level min.insync.replicas overrides, and create the required cluster-level override if needed. Similarly, if the cluster was created with eligible.leader.replicas.version enabled, we create a similar cluster-level record. In both cases, we don't allow setting overrides for individual brokers afterwards, or removing the cluster-level override.
Split ActivationRecordsGeneratorTest up into multiple test cases rather than having it be one giant test case.
Fix a bug in QuorumControllerTestEnv where we would replay records manually on objects, racing with the active controller thread. Instead, we should simply ensure that the initial bootstrap records contains what we want.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This pull request replaces Log4j with Log4j2 across the entire project, including dependencies, configurations, and code. The notable changes are listed below:
1. Introduce Log4j2 Instead of Log4j
2. Change Configuration File Format from Properties to YAML
3. Adds warnings to notify users if they are still using Log4j properties, encouraging them to transition to Log4j2 configurations
Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit implements the changes for KIP-1032. This updates Kafka to Jakarta specs, JavaEE 10 and Jetty 12. The changes here primarily effect Kafka Connect and MM2.
Todo/Notes:
1) I bumped the connect modules to JDK 17 but I also had to bump a couple other things that had a dependency on conect. The tools project depends on connect so that had to be bumped, and streams depends on tools so that needed to be bumped. This means we may need to separate some things if we don't want to enforce JDK 17 on streams.
2) There is an issue with a test in DedicatedMirrorIntegrationTest that I had to change for now that involves escaping characters and not quite sure what to do about it yet. The cause is the Servlet 6 spec changing what is allowed in the path. See: Jetty 12: 400: Ambiguous URI path encoding for path <%=FOO%>~1 (encoded: %3C%25%3DFOO%25%3E%7E1) jetty/jetty.project#11890
3) I had to configure the idle timeout in Jetty requests to match our request timeout so tests didn't fail. This was needed to fix the ConnectWorkerIntegrationTest#testPollTimeoutExpiry() test
Testing is being done by just using the existing tests for Connect and MM2 which should be sufficient.
Reviewers: Greg Harris <greg.harris@aiven.io>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch introduces the `--validate-regex` argument to the `kafka-consumer-group` command line tool as defined in KIP-848. The new argument allows the verification of RE2 regular expressions.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Add git support for schema compatibility checker. Pulls in valid schema from remote git trunk branch to check with edited schema in local branch. Adds new option for command line verify-evolution-git which takes in a required file name.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Adds support for UpdateRaftVoterRequest in KafkaNetworkChannel. This addresses the following scenario:
* Bootstrap a KRaft Controller quorum in dynamic mode
* Start additional controllers (as observers)
* Update kraft.version feature from 0 to 1
* Use kafka-metadata-quorum add-controller to promote an observer controller to a follower
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Alyssa Huang <ahuang@confluent.io>
Implementation of https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code
- Introduces rebootstrap trigger interval config metadata.recovery.rebootstrap.trigger.ms, set to 5 minutes by default
- Makes rebootstrap the default for metadata.recovery.strategy
- Adds new error code REBOOTSTRAP_REQUIRED, introduces top-level error code in metadata response. On this error, clients rebootstrap.
- Configs apply to producers, consumers, share consumers, admin clients, Connect and KStreams clients.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
After the share.auto.offset.reset dynamic config was added for share groups in this commit - 9db5ed0, we needed to update this config value to "earliest" in ShareRoundTripWorker when it creates the consumer.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
As part of PR: https://github.com/apache/kafka/pull/17636 where purgatory has been moved from core to server-common hence move some existing classes used in Share Fetch to Share module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias Sax <mjsax@apache.org>
This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask for each non-empty Task directory found on-disk.
We do this before starting any StreamThreads, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTasks that were assigned to us, either as an Active or a
Standby.
We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread, so we first have to
update them with the context (log context, ChangelogReader, and source
topics) of the thread that it has been assigned to.
The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore itself, rather than the
.checkpoint file, which we plan to deprecate and remove.
There are a few additional benefits:
Initializing these Tasks on start-up, instead of on-assignment, will
reduce the time between a member joining the consumer group and beginning
processing. This is especially important when active tasks are being moved
over, for example, as part of a rolling restart.
If a Task has corrupt data on-disk, it will be discovered on startup and
wiped under EOS. This is preferable to wiping the state after being
assigned the Task, because another instance may have non-corrupt data and
would not need to restore (as much).
There is a potential performance impact: we open all on-disk Task
StateStores, and keep them all open until we have our first assignment.
This could require large amounts of memory, in particular when there are
a large number of local state stores on-disk.
However, since old local state for Tasks we don't own is automatically
cleaned up after a period of time, in practice, we will almost always
only be dealing with the state that was last assigned to the local
instance.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>, Matthias Sax <mjsax@apache.org>
Adds the DefaultStatePersister and other supporting classes for managing share state.
* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
This allows shutting down a KafkaClusterTestKit from a JVM shutdown hook without risking error logs because the base directory has already been deleted by the shutdown hook TestUtils.tempDirectory sets up.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In Kafka 4.0, by default the consumer uses the updated consumer group protocol defined in KIP-848. When the consumer is used with a cluster that does not support (or is not configured to use) the new protocol, the consumer will get an unfriendly error about unavailable APIs. Since this error could be the user's first impression when attempting to upgrade to 4.0, we need to make sure that the error is very clear about the remediation steps (set the group.protocol to CLASSIC on the client or upgrade and enable the new protocol on the cluster).
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
As part of KIP-932, a new internal topic __share_group_state was introduced. There are 2 types of records which are currently being added in this topic - ShareSnapshotKey/Value and ShareUpdateKey/Value
In light of this, we must make the existing tooling like kafka-console-consumer and kafka-dump-log aware of these records for debugging and introspection purposes.
This PR introduces ShareGroupStateMessageFormatter to be used used with kafka-console-consumer and adds an internal class ShareGroupStateMessageParser in DumpLogSegments.scala.
Unit tests have been added to DumpLogSegmentsTest.scala
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
After MirrorMaker 1 removal, there are no other modules dependencies for these classes, so we can safely move them to tools module.
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones.
Reviewers: David Arthur <mumrah@gmail.com>