From 13d9a199f253dbd920bfce8f273aa8f82b0ad738 Mon Sep 17 00:00:00 2001
From: Mickael Maison Password config values that are dynamically updated are encrypted before storing in ZooKeeper. The broker config
- The secret used for password encoding may be rotated with a rolling restart of brokers. The old secret used for encoding
- passwords currently in ZooKeeper must be provided in the static broker config In Kafka 1.1.x, all dynamically updated password configs must be provided in every alter request when updating configs
- using
- For clusters using Zookeeper, liveness is determined indirectly through the existence of an ephemeral node which is created by the broker on
- initialization of its Zookeeper session. If the broker loses its session after failing to send heartbeats to Zookeeper before expiration of
-
We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" replicas,
which is known as the ISR. If either of these conditions fail to be satisfied, then the broker will be removed from the ISR. For example,
if a follower dies, then the controller will notice the failure through the loss of its session, and will remove the broker from the ISR.
@@ -624,7 +619,7 @@
Quota configuration may be defined for (user, client-id), user and client-id groups. It is possible to override the default quota at any of the quota levels that needs a higher (or even lower) quota.
The mechanism is similar to the per-topic log config overrides.
- User and (user, client-id) quota overrides are written to ZooKeeper under /config/users and client-id quota overrides are written under /config/clients.
+ User and (user, client-id) quota overrides are written to the metadata log.
These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details.
Default quotas for each group may also be updated dynamically using the same mechanism.
-
Topic-level configurations and defaults are discussed in more detail below.
@@ -62,39 +63,12 @@
All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing).
If a config value is defined at different levels, the following order of precedence is used:
broker.id
+ node.id
log.dirs
- zookeeper.connect
+ process.roles
+ controller.quorum.bootstrap.servers
-
- server.propertiesUpdating Password Configs Dynamically
- password.encoder.secret must be configured in server.properties to enable dynamic update
- of password configs. The secret may be different on different brokers.password.encoder.old.secret and
- the new secret must be provided in password.encoder.secret. All dynamic password configs stored in ZooKeeper
- will be re-encoded with the new secret when the broker starts up.kafka-configs.sh even if the password config is not being altered. This constraint will be removed in
- a future release.Updating Password Configs in ZooKeeper Before Starting Brokers
-
- From Kafka 2.0.0 onwards, kafka-configs.sh enables dynamic broker configs to be updated using ZooKeeper before
- starting brokers for bootstrapping. This enables all password configs to be stored in encrypted form, avoiding the need for
- clear passwords in server.properties. The broker config password.encoder.secret must also be specified
- if any password configs are included in the alter command. Additional encryption parameters may also be specified. Password
- encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password for listener INTERNAL
- on broker 0:
-
-
-
- The configuration $ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config
- 'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'listener.name.internal.ssl.key.password will be persisted in ZooKeeper in encrypted
- form using the provided encoder configs. The encoder secret and iterations are not persisted in ZooKeeper.
-
Updating SSL Keystore of an Existing Listener
Brokers may be configured with SSL keystores with short validity periods to reduce the risk of compromised certificates.
Keystores may be updated dynamically without restarting the broker. The config name must be prefixed with the listener prefix
@@ -157,14 +131,6 @@
log.message.timestamp.difference.max.msunclean.leader.election.enable is dynamically updated.
- In Kafka version 1.1.x, changes to unclean.leader.election.enable take effect only when a new controller is elected.
- Controller re-election may be forced by running:
-
-
-
$ bin/zookeeper-shell.sh localhost
- rmr /controllerUpdating Log Cleaner Configs
Log cleaner configs may be updated dynamically at cluster-default level used by all brokers. The changes take effect
on the next iteration of log cleaning. One or more of these configs may be updated:
diff --git a/docs/design.html b/docs/design.html
index f775552f4cf..a671e8285e4 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -334,11 +334,6 @@
sending periodic heartbeats to the controller. If the controller fails to receive a heartbeat before the timeout configured by
broker.session.timeout.ms expires, then the node is considered offline.
zookeeper.session.timeout.ms, then the node gets deleted. The controller would then notice the node deletion through a Zookeeper watch
- and mark the broker offline.
-
diff --git a/docs/implementation.html b/docs/implementation.html index 93c9aa60c4c..25a7f60b18f 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -255,45 +255,6 @@ messageSetSend n CoordinatorLoadInProgressException and the consumer may retry the OffsetFetchRequest after backing off.
-- The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers. -
- -
- When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".
-
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
- - This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) results in an error. -
-- Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). -
-/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
-
- - Each broker registers itself under the topics it maintains and stores the number of partitions for that topic. -
- -- The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_\-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time. -
-
- Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.
-
- The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker. -
diff --git a/docs/migration.html b/docs/migration.html deleted file mode 100644 index 95fc87ffaca..00000000000 --- a/docs/migration.html +++ /dev/null @@ -1,34 +0,0 @@ - - - -
Kafka naturally batches data in both the producer and consumer so it can achieve high-throughput even over a high-latency connection. To allow this though it may be necessary to increase the TCP socket buffer sizes for the producer, consumer, and broker using the socket.send.buffer.bytes and socket.receive.buffer.bytes configurations. The appropriate way to set this is documented here.
- It is generally not advisable to run a single Kafka cluster that spans multiple datacenters over a high-latency link. This will incur very high replication latency both for Kafka writes and ZooKeeper writes, and neither Kafka nor ZooKeeper will remain available in all locations if the network between locations is unavailable. + It is generally not advisable to run a single Kafka cluster that spans multiple datacenters over a high-latency link. This will incur very high replication latency for Kafka writes, and Kafka will remain available in all locations if the network between locations is unavailable.
CreateTopicPolicy and AlterConfigPolicy (see KIP-108 and the settings create.topic.policy.class.name, alter.config.policy.class.name).- All configurations are documented in the configuration section. -
-
# ZooKeeper
-zookeeper.connect=[list of ZooKeeper servers]
-
-# Log configuration
-num.partitions=8
-default.replication.factor=3
-log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
-
-# Other configurations
-broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
-listeners=[list of listeners]
-auto.create.topics.enable=false
-min.insync.replicas=2
-queued.max.requests=[number of concurrent requests]
-
- Our client configuration varies a fair amount between different use cases.
-
- @@ -1274,7 +1240,7 @@ queued.max.requests=[number of concurrent requests] All of the brokers in that cluster have a 90% GC pause time of about 21ms with less than 1 young GC per second. -
You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30. @@ -1380,7 +1346,7 @@ NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp
$ bin/kafka-server-start.sh server_properties
- @@ -1723,17 +1689,6 @@ NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp
With the release of Apache Kafka 3.5, Zookeeper is now marked deprecated. Removal of ZooKeeper is planned in the next major release of Apache Kafka (version 4.0), - which is scheduled to happen no sooner than April 2024. During the deprecation phase, ZooKeeper is still supported for metadata management of Kafka clusters, - but it is not recommended for new deployments. There is a small subset of features that remain to be implemented in KRaft - see current missing features for more information.
- -Users are recommended to begin planning for migration to KRaft and also begin testing to provide any feedback. Refer to ZooKeeper to KRaft Migration for details on how to perform a live migration from ZooKeeper to KRaft and current limitations.
- -The final 3.x minor release, that supports ZooKeeper mode, will receive critical bug fixes and security fixes for 12 months after its release.
- -process.roles is set to broker, the server acts as a broker.process.roles is set to controller, the server acts as a controller.process.roles is set to broker,controller, the server acts as both a broker and a controller.process.roles is not set at all, it is assumed to be in ZooKeeper mode.Kafka servers that act as both brokers and controllers are referred to as "combined" servers. Combined servers are simpler to operate for small use cases like a development environment. The key disadvantage is that the controller will be less isolated from the rest of the system. For example, it is not possible to roll or scale the controllers separately from the brokers in combined mode. Combined mode is not recommended in critical deployment environments.
@@ -3758,7 +3648,7 @@ customized state stores; for built-in state stores, currently we have:In KRaft mode, specific Kafka servers are selected to be controllers (unlike the ZooKeeper-based mode, where any server can become the Controller). The servers selected to be controllers will participate in the metadata quorum. Each controller is either an active or a hot standby for the current active controller.
+In KRaft mode, specific Kafka servers are selected to be controllers. The servers selected to be controllers will participate in the metadata quorum. Each controller is either an active or a hot standby for the current active controller.
A Kafka admin will typically select 3 or 5 servers for this role, depending on factors like cost and the number of concurrent failures your system should withstand without availability impact. A majority of the controllers must be alive in order to maintain availability. With 3 controllers, the cluster can tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 controller failures.
@@ -3960,358 +3850,10 @@ fooIn order to migrate from ZooKeeper to KRaft you need to use a bridge release. The last bridge release is Kafka 3.9. + See the ZooKeeper to KRaft Migration steps in the 3.9 documentation.
-- Before beginning the migration, the Kafka brokers must be upgraded to software version {{fullDotVersion}} and have the - "inter.broker.protocol.version" configuration set to "{{dotVersion}}". -
- -- It is recommended to enable TRACE level logging for the migration components while the migration is active. This can - be done by adding the following log4j configuration to each KRaft controller's "log4j.properties" file. -
- -log4j.logger.org.apache.kafka.metadata.migration=TRACE
-
- - It is generally useful to enable DEBUG logging on the KRaft controllers and the ZK brokers during the migration. -
- -- Two things are needed before the migration can begin. First, the brokers must be configured to support the migration and second, - a KRaft controller quorum must be deployed. The KRaft controllers should be provisioned with the same cluster ID as - the existing Kafka cluster. This can be found by examining one of the "meta.properties" files in the data directories - of the brokers, or by running the following command. -
- -$ bin/zookeeper-shell.sh localhost:2181 get /cluster/id
-
-
- The KRaft controller quorum should also be provisioned with the latest metadata.version.
- This is done automatically when you format the node with the kafka-storage.sh tool.
- For further instructions on KRaft deployment, please refer to the above documentation.
-
- In addition to the standard KRaft configuration, the KRaft controllers will need to enable support for the migration - as well as provide ZooKeeper connection configuration. -
- -- Here is a sample config for a KRaft controller that is ready for migration: -
-# Sample KRaft cluster controller.properties listening on 9093
-process.roles=controller
-node.id=3000
-controller.quorum.bootstrap.servers=localhost:9093
-controller.listener.names=CONTROLLER
-listeners=CONTROLLER://:9093
-
-# Enable the migration
-zookeeper.metadata.migration.enable=true
-
-# ZooKeeper client configuration
-zookeeper.connect=localhost:2181
-
-# The inter broker listener in brokers to allow KRaft controller send RPCs to brokers
-inter.broker.listener.name=PLAINTEXT
-
-# Other configs ...
-
- The new standalone controller in the example configuration above should be formatted using the bin/kafka-storage.sh format --standalonecommand.
Note: The KRaft cluster node.id values must be different from any existing ZK broker broker.id.
- In KRaft-mode, the brokers and controllers share the same Node ID namespace.
- Once the KRaft controller quorum has been started, the brokers will need to be reconfigured and restarted. Brokers - may be restarted in a rolling fashion to avoid impacting cluster availability. Each broker requires the - following configuration to communicate with the KRaft controllers and to enable the migration. -
- -broker.id is set to a non-negative integer even if broker.id.generation.enable is enabled (default is enabled). Additionally, ensure broker.id does not exceed reserved.broker.max.id to avoid failure.Here is a sample config for a broker that is ready for migration:
- -# Sample ZK broker server.properties listening on 9092
-broker.id=0
-listeners=PLAINTEXT://:9092
-advertised.listeners=PLAINTEXT://localhost:9092
-listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
-
-# Set the IBP
-inter.broker.protocol.version={{dotVersion}}
-
-# Enable the migration
-zookeeper.metadata.migration.enable=true
-
-# ZooKeeper client configuration
-zookeeper.connect=localhost:2181
-
-# KRaft controller quorum configuration
-controller.quorum.bootstrap.servers=localhost:9093
-controller.listener.names=CONTROLLER
-
- - Note: Once the final ZK broker has been restarted with the necessary configuration, the migration will automatically begin. - When the migration is complete, an INFO level log can be observed on the active controller: -
- -Completed migration of metadata from Zookeeper to KRaft- -
- Once the KRaft controller completes the metadata migration, the brokers will still be running - in ZooKeeper mode. While the KRaft controller is in migration mode, it will continue sending - controller RPCs to the ZooKeeper mode brokers. This includes RPCs like UpdateMetadata and - LeaderAndIsr. -
- -
- To migrate the brokers to KRaft, they simply need to be reconfigured as KRaft brokers and restarted. Using the above
- broker configuration as an example, we would replace the broker.id with node.id and add
- process.roles=broker. It is important that the broker maintain the same Broker/Node ID when it is restarted.
- The zookeeper configurations should be removed at this point.
-
- If your broker has authorization configured via the authorizer.class.name property
- using kafka.security.authorizer.AclAuthorizer, this is also the time to change it
- to use org.apache.kafka.metadata.authorizer.StandardAuthorizer instead.
-
# Sample KRaft broker server.properties listening on 9092
-process.roles=broker
-node.id=0
-listeners=PLAINTEXT://:9092
-advertised.listeners=PLAINTEXT://localhost:9092
-listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
-
-# Don't set the IBP, KRaft uses "metadata.version" feature flag
-# inter.broker.protocol.version={{dotVersion}}
-
-# Remove the migration enabled flag
-# zookeeper.metadata.migration.enable=true
-
-# Remove ZooKeeper client configuration
-# zookeeper.connect=localhost:2181
-
-# Keep the KRaft controller quorum configuration
-controller.quorum.bootstrap.servers=localhost:9093
-controller.listener.names=CONTROLLER
-
- - Each broker is restarted with a KRaft configuration until the entire cluster is running in KRaft mode. -
- -- Once all brokers have been restarted in KRaft mode, the last step to finalize the migration is to take the - KRaft controllers out of migration mode. This is done by removing the "zookeeper.metadata.migration.enable" - property from each of their configs and restarting them one at a time. -
-- Once the migration has been finalized, you can safely deprovision your ZooKeeper cluster, assuming you are - not using it for anything else. After this point, it is no longer possible to revert to ZooKeeper mode. -
- -# Sample KRaft cluster controller.properties listening on 9093
-process.roles=controller
-node.id=3000
-controller.quorum.bootstrap.servers=localhost:9093
-controller.listener.names=CONTROLLER
-listeners=CONTROLLER://:9093
-
-# Disable the migration
-# zookeeper.metadata.migration.enable=true
-
-# Remove ZooKeeper client configuration
-# zookeeper.connect=localhost:2181
-
-# Other configs ...
-
- - While the cluster is still in migration mode, it is possible to revert to ZooKeeper mode. The process - to follow depends on how far the migration has progressed. In order to find out how to revert, - select the final migration step that you have completed in this table. -
-- Note that the directions given here assume that each step was fully completed, and they were - done in order. So, for example, we assume that if "Enter Migration Mode on the Brokers" was - completed, "Provisioning the KRaft controller quorum" was also fully completed previously. -
-- If you did not fully complete any step, back out whatever you have done and then follow revert - directions for the last fully completed step. -
- -| Final Migration Section Completed | -Directions for Reverting | -Notes | -
|---|---|---|
| Preparing for migration | -- The preparation section does not involve leaving ZooKeeper mode. So there is nothing to do in the - case of a revert. - | -- | -
| Provisioning the KRaft controller quorum | -
-
|
- - | -
| Enter Migration Mode on the brokers | -
-
|
-
- It is important to perform the zookeeper-shell.sh step quickly, to minimize the amount of
- time that the cluster lacks a controller. Until the /controller znode is deleted,
- you can also ignore any errors in the broker log about failing to connect to the Kraft controller.
- Those error logs should disappear after second roll to pure zookeeper mode.
- |
-
| Migrating brokers to KRaft | -
-
|
-
-
|
-
| Finalizing the migration | -- If you have finalized the ZK migration, then you cannot revert. - | -- Some users prefer to wait for a week or two before finalizing the migration. While this - requires you to keep the ZooKeeper cluster running for a while longer, it may be helpful - in validating KRaft mode in your cluster. - | -
After build successfully, there should be a `kafka-storage-x.x.x-test.jar` file under `storage/build/libs`. Next, setting configurations in the broker side to enable tiered storage feature.
-# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092
+# Sample KRaft broker server.properties listening on PLAINTEXT://:9092
remote.log.storage.system.enable=true
# Setting the listener for the clients in RemoteLogMetadataManager to talk to the brokers.
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 64a7a23c6b9..1ded73e2256 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -42,15 +42,11 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
Step 2: Start the Kafka environment
- NOTE: Your local environment must have Java 11+ installed.
+ NOTE: Your local environment must have Java 17+ installed.
- Apache Kafka can be started using KRaft or ZooKeeper. To get started with either configuration follow one of the sections below but not both.
+ Kafka can be run using local scripts and downloaded files or the docker image.
- Kafka with KRaft
-
- Kafka can be run using KRaft mode using local scripts and downloaded files or the docker image. Follow one of the sections below but not both to start the kafka server.
-
- Using downloaded files
+ Using downloaded files
Generate a Cluster UUID
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
@@ -63,7 +59,7 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
Once the Kafka server has successfully launched, you will have a basic Kafka environment running and ready to use.
-Get the Docker image:
$ docker pull apache/kafka:{{fullDotVersion}}
@@ -71,7 +67,7 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
Start the Kafka Docker container:
$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}
- Get the Docker image:
$ docker pull apache/kafka-native:{{fullDotVersion}}
@@ -79,18 +75,6 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
Start the Kafka Docker container:
$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}
- Run the following commands in order to start all services in the correct order:
-# Start the ZooKeeper service
-$ bin/zookeeper-server-start.sh config/zookeeper.properties
-
- Open another terminal session and run:
-# Start the Kafka broker service
-$ bin/kafka-server-start.sh config/server.properties
-
- Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
-Ctrl-C.
Ctrl-C.
- @@ -330,7 +311,7 @@ wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.L along the way, run the command:
-$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
+ $ rm -rf /tmp/kafka-logs /tmp/kraft-combined-logs
- This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. However, if you have already started Kafka, feel free to skip the first two steps. + This tutorial assumes you are starting fresh and have no existing Kafka data. However, if you have already started Kafka, feel free to skip the first two steps.
@@ -96,30 +96,6 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
- Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one of the sections below but not both. -
- -- Run the following commands in order to start all services in the correct order: -
- -$ bin/zookeeper-server-start.sh config/zookeeper.properties
-
-- Open another terminal session and run: -
- -$ bin/kafka-server-start.sh config/server.properties
-
-Generate a Cluster UUID
@@ -325,7 +301,7 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing hYou can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server (if one was started) in order via Ctrl-C.
+You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker in order via Ctrl-C.