mirror of https://github.com/apache/kafka.git
MINOR: Refresh of the docs (#16375)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
8199290500
commit
c4a3d2475f
|
@ -43,21 +43,21 @@
|
|||
</ul>
|
||||
|
||||
To alter the current broker configs for broker id 0 (for example, the number of log cleaner threads):
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2</code></pre>
|
||||
|
||||
To describe the current dynamic broker configs for broker id 0:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe</code></pre>
|
||||
|
||||
To delete a config override and revert to the statically configured or default value for broker id 0 (for example,
|
||||
the number of log cleaner threads):
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads</code></pre>
|
||||
|
||||
Some configs may be configured as a cluster-wide default to maintain consistent values across the whole cluster. All brokers
|
||||
in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2</code></pre>
|
||||
|
||||
To describe the currently configured dynamic cluster-wide default configs:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe</code></pre>
|
||||
|
||||
All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing).
|
||||
If a config value is defined at different levels, the following order of precedence is used:
|
||||
|
@ -89,7 +89,7 @@
|
|||
encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password for listener <code>INTERNAL</code>
|
||||
on broker 0:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config
|
||||
'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'</code></pre>
|
||||
|
||||
The configuration <code>listener.name.internal.ssl.key.password</code> will be persisted in ZooKeeper in encrypted
|
||||
|
@ -162,7 +162,7 @@
|
|||
In Kafka version 1.1.x, changes to <code>unclean.leader.election.enable</code> take effect only when a new controller is elected.
|
||||
Controller re-election may be forced by running:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/zookeeper-shell.sh localhost
|
||||
<pre><code class="language-bash">$ bin/zookeeper-shell.sh localhost
|
||||
rmr /controller</code></pre>
|
||||
|
||||
<h5>Updating Log Cleaner Configs</h5>
|
||||
|
@ -220,17 +220,17 @@
|
|||
<h3 class="anchor-heading"><a id="topicconfigs" class="anchor-link"></a><a href="#topicconfigs">3.2 Topic-Level Configs</a></h3>
|
||||
|
||||
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
|
||||
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1</code></pre>
|
||||
Overrides can also be changed or set later using the alter configs command. This example updates the max message size for <i>my-topic</i>:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
|
||||
--alter --add-config max.message.bytes=128000</code></pre>
|
||||
|
||||
To check overrides set on the topic you can do
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe</code></pre>
|
||||
|
||||
To remove an override you can do
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
|
||||
--alter --delete-config max.message.bytes</code></pre>
|
||||
|
||||
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override.
|
||||
|
@ -294,9 +294,9 @@
|
|||
<h4><a id="org.apache.kafka.disallowed.login.modules"></a><a id="systemproperties_org.apache.kafka.disallowed.login.modules" href="#systemproperties_org.apache.kafka.disallowed.login.modules">org.apache.kafka.disallowed.login.modules</a></h4>
|
||||
<p>This system property is used to disable the problematic login modules usage in SASL JAAS configuration. This property accepts comma-separated list of loginModule names. By default <b>com.sun.security.auth.module.JndiLoginModule</b> loginModule is disabled.
|
||||
<p>If users want to enable JndiLoginModule, users need to explicitly reset the system property like below. We advise the users to validate configurations and only allow trusted JNDI configurations. For more details <a href="https://nvd.nist.gov/vuln/detail/CVE-2023-25194">CVE-2023-25194</a>.
|
||||
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=</pre>
|
||||
<p><pre><code class="language-bash">-Dorg.apache.kafka.disallowed.login.modules=</code></pre>
|
||||
<p>To disable more loginModules, update the system property with comma-separated loginModule names. Make sure to explicitly add <b>JndiLoginModule</b> module name to the comma-separated list like below.
|
||||
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule</pre>
|
||||
<p><pre><code class="language-bash">-Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule</code></pre>
|
||||
<table><tbody>
|
||||
<tr><th>Since:</th><td>3.4.0</td></tr>
|
||||
<tr><th>Default Value:</th><td>com.sun.security.auth.module.JndiLoginModule</td></tr>
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
|
||||
<p>In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json …]</code></pre>
|
||||
<pre><code class="language-bash">$ bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json …]</code></pre>
|
||||
|
||||
<p>The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs:</p>
|
||||
<ul>
|
||||
|
@ -63,7 +63,7 @@
|
|||
|
||||
<p>Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/connect-distributed.sh config/connect-distributed.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/connect-distributed.sh config/connect-distributed.properties</code></pre>
|
||||
|
||||
<p>The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage.</p>
|
||||
|
||||
|
@ -118,7 +118,7 @@
|
|||
|
||||
<p>Throughout the example we'll use schemaless JSON data format. To use schemaless format, we changed the following two lines in <code>connect-standalone.properties</code> from true to false:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">key.converter.schemas.enable
|
||||
<pre><code class="language-text">key.converter.schemas.enable
|
||||
value.converter.schemas.enable</code></pre>
|
||||
|
||||
<p>The file source connector reads each line as a String. We will wrap each line in a Map and then add a second field to identify the origin of the event. To do this, we use two transformations:</p>
|
||||
|
@ -129,7 +129,7 @@ value.converter.schemas.enable</code></pre>
|
|||
|
||||
<p>After adding the transformations, <code>connect-file-source.properties</code> file looks as following:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">name=local-file-source
|
||||
<pre><code class="language-text">name=local-file-source
|
||||
connector.class=FileStreamSource
|
||||
tasks.max=1
|
||||
file=test.txt
|
||||
|
@ -145,13 +145,13 @@ transforms.InsertSource.static.value=test-file-source</code></pre>
|
|||
|
||||
<p>When we ran the file source connector on my sample file without the transformations, and then read them using <code>kafka-console-consumer.sh</code>, the results were:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">"foo"
|
||||
<pre><code class="language-text">"foo"
|
||||
"bar"
|
||||
"hello world"</code></pre>
|
||||
|
||||
<p>We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-json">{"line":"foo","data_source":"test-file-source"}
|
||||
<pre><code class="language-json">{"line":"foo","data_source":"test-file-source"}
|
||||
{"line":"bar","data_source":"test-file-source"}
|
||||
{"line":"hello world","data_source":"test-file-source"}</code></pre>
|
||||
|
||||
|
@ -208,7 +208,7 @@ transforms.InsertSource.static.value=test-file-source</code></pre>
|
|||
|
||||
<p>To do this we need first to filter out the records destined for the topic 'foo'. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches's only configuration property is <code>pattern</code> which is a Java regular expression for matching against the topic name. The configuration would look like this:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">transforms=Filter
|
||||
<pre><code class="language-text">transforms=Filter
|
||||
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
|
||||
transforms.Filter.predicate=IsFoo
|
||||
|
||||
|
@ -218,7 +218,7 @@ predicates.IsFoo.pattern=foo</code></pre>
|
|||
|
||||
<p>Next we need to apply ExtractField only when the topic name of the record is not 'bar'. We can't just use TopicNameMatches directly, because that would apply the transformation to matching topic names, not topic names which do <i>not</i> match. The transformation's implicit <code>negate</code> config properties allows us to invert the set of records which a predicate matches. Adding the configuration for this to the previous example we arrive at:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">transforms=Filter,Extract
|
||||
<pre><code class="language-text">transforms=Filter,Extract
|
||||
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
|
||||
transforms.Filter.predicate=IsFoo
|
||||
|
||||
|
@ -253,7 +253,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
|||
This field should contain a list of listeners in the following format: <code>protocol://host:port,protocol2://host2:port2</code>. Currently supported protocols are <code>http</code> and <code>https</code>.
|
||||
For example:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=http://localhost:8080,https://localhost:8443</code></pre>
|
||||
<pre><code class="language-text">listeners=http://localhost:8080,https://localhost:8443</code></pre>
|
||||
|
||||
<p>By default, if no <code>listeners</code> are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration.
|
||||
By default, it will use the <code>ssl.*</code> settings. In case it is needed to use different configuration for the REST API than for connecting to Kafka brokers, the fields can be prefixed with <code>listeners.https</code>.
|
||||
|
@ -315,7 +315,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
|||
<li><code>DELETE /connectors/{name}/offsets</code> - reset the offsets for a connector. The connector must exist and must be in the stopped state (see <a href="#connect_stopconnector"><code>PUT /connectors/{name}/stop</code></a>)</li>
|
||||
<li><code>PATCH /connectors/{name}/offsets</code> - alter the offsets for a connector. The connector must exist and must be in the stopped state (see <a href="#connect_stopconnector"><code>PUT /connectors/{name}/stop</code></a>). The request body should be a JSON object containing a JSON array <code>offsets</code> field, similar to the response body of the <code>GET /connectors/{name}/offsets</code> endpoint.
|
||||
An example request body for the <code>FileStreamSourceConnector</code>:
|
||||
<pre class="line-numbers"><code class="json">{
|
||||
<pre><code class="language-json">{
|
||||
"offsets": [
|
||||
{
|
||||
"partition": {
|
||||
|
@ -328,7 +328,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
|||
]
|
||||
}</code></pre>
|
||||
An example request body for the <code>FileStreamSinkConnector</code>:
|
||||
<pre class="line-numbers"><code class="json">{
|
||||
<pre><code class="language-json">{
|
||||
"offsets": [
|
||||
{
|
||||
"partition": {
|
||||
|
@ -370,7 +370,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
|||
<p>The <code>admin.listeners</code> configuration can be used to configure admin REST APIs on Kafka Connect's REST API server. Similar to the <code>listeners</code> configuration, this field should contain a list of listeners in the following format: <code>protocol://host:port,protocol2://host2:port2</code>. Currently supported protocols are <code>http</code> and <code>https</code>.
|
||||
For example:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">admin.listeners=http://localhost:8080,https://localhost:8443</code></pre>
|
||||
<pre><code class="language-text">admin.listeners=http://localhost:8080,https://localhost:8443</code></pre>
|
||||
|
||||
<p>By default, if <code>admin.listeners</code> is not configured, the admin REST APIs will be available on the regular listeners.</p>
|
||||
|
||||
|
@ -396,7 +396,7 @@ predicates.IsBar.pattern=bar</code></pre>
|
|||
|
||||
<p>By default connectors exhibit "fail fast" behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text"># disable retries on failure
|
||||
<pre><code class="language-text"># disable retries on failure
|
||||
errors.retry.timeout=0
|
||||
|
||||
# do not log the error and their contexts
|
||||
|
@ -410,7 +410,7 @@ errors.tolerance=none</code></pre>
|
|||
|
||||
<p>These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the <code>my-connector-errors</code> Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text"># retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
|
||||
<pre><code class="language-text"># retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
|
||||
errors.retry.timeout=600000
|
||||
errors.retry.delay.max.ms=30000
|
||||
|
||||
|
@ -587,7 +587,7 @@ errors.tolerance=all</code></pre>
|
|||
|
||||
<p>For example, if you only have one connector with the fully-qualified name <code>com.example.MySinkConnector</code>, then only one manifest file must be added to resources in <code>META-INF/services/org.apache.kafka.connect.sink.SinkConnector</code>, and the contents should be similar to the following:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text"># license header or comment
|
||||
<pre><code class="language-text"># license header or comment
|
||||
com.example.MySinkConnector</code></pre>
|
||||
|
||||
<p>You should then verify that your manifests are correct by using the <a href="#connect_plugindiscovery_compatibility">verification steps</a> with a pre-release artifact. If the verification succeeds, you can then release the plugin normally, and operators can upgrade to the compatible version.</p>
|
||||
|
@ -627,7 +627,7 @@ com.example.MySinkConnector</code></pre>
|
|||
<h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector Example</a></h5>
|
||||
|
||||
<p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Pick a package and class name, these examples will use the <code>FileStreamSourceConnector</code> but substitute your own class name where appropriate. In order to <a href="#connect_plugindiscovery">make the plugin discoverable at runtime</a>, add a ServiceLoader manifest to your resources in <code>META-INF/services/org.apache.kafka.connect.source.SourceConnector</code> with your fully-qualified class name on a single line:</p>
|
||||
<pre class="line-numbers"><code class="language-resource">com.example.FileStreamSourceConnector</code></pre>
|
||||
<pre><code class="language-resource">com.example.FileStreamSourceConnector</code></pre>
|
||||
|
||||
<p>Create a class that inherits from <code>SourceConnector</code> and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size):</p>
|
||||
|
||||
|
@ -704,6 +704,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
|
|||
@Override
|
||||
public synchronized void stop() {
|
||||
stream.close();
|
||||
}
|
||||
}</code></pre>
|
||||
|
||||
<p>These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
|
||||
|
@ -754,6 +755,7 @@ public List<SourceRecord> poll() throws InterruptedException {
|
|||
public abstract void put(Collection<SinkRecord> records);
|
||||
|
||||
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
|
||||
}
|
||||
}</code></pre>
|
||||
|
||||
<p>The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>: Kafka topic, partition, offset, the event key and value, and optional headers.</p>
|
||||
|
|
|
@ -26,14 +26,12 @@
|
|||
<a href="https://www.docker.com/">Docker</a> is a popular container runtime. Docker images for the JVM based Apache Kafka can be found on <a href="https://hub.docker.com/r/apache/kafka">Docker Hub</a> and are available from version 3.7.0.
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li> Docker image can be pulled from Docker Hub using the following command: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
<li> If you want to fetch the latest version of the docker image use following command: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka:latest</code></pre>
|
||||
<li> To start the Kafka container using this docker image with default configs and on default port 9092: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
</ul>
|
||||
<p> Docker image can be pulled from Docker Hub using the following command: </p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
<p> If you want to fetch the latest version of the Docker image use following command: </p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka:latest</code></pre>
|
||||
<p> To start the Kafka container using this Docker image with default configs and on default port 9092: </p>
|
||||
<pre><code class="language-bash">$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
|
||||
<h4 class="anchor-heading">GraalVM Based Native Apache Kafka Docker Image</h4>
|
||||
|
||||
|
@ -42,19 +40,18 @@
|
|||
NOTE: This image is experimental and intended for local development and testing purposes only; it is not recommended for production use.
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li> Docker image can be pulled from Docker Hub using the following command: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
<li> If you want to fetch the latest version of the docker image use following command: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka-native:latest</code></pre>
|
||||
<li> To start the Kafka container using this docker image with default configs and on default port 9092: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
</ul>
|
||||
<p> Docker image can be pulled from Docker Hub using the following command: </p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
<p> If you want to fetch the latest version of the Docker image use following command: </p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka-native:latest</code></pre>
|
||||
<p> To start the Kafka container using this Docker image with default configs and on default port 9092: </p>
|
||||
<pre><code class="language-bash">$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
|
||||
|
||||
<h4 class="anchor-heading">Usage guide</h4>
|
||||
|
||||
<p>
|
||||
Detailed instructions for using the docker image are mentioned <a href="https://github.com/apache/kafka/blob/trunk/docker/examples/README.md">here</a>.
|
||||
Detailed instructions for using the Docker image are mentioned <a href="https://github.com/apache/kafka/blob/trunk/docker/examples/README.md">here</a>.
|
||||
</p>
|
||||
</script>
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="#recordbatch">5.3.1 Record Batch</a></h4>
|
||||
<p> The following is the on-disk format of a RecordBatch. </p>
|
||||
<pre class="line-numbers"><code class="language-text">baseOffset: int64
|
||||
<pre><code class="language-text">baseOffset: int64
|
||||
batchLength: int32
|
||||
partitionLeaderEpoch: int32
|
||||
magic: int8 (current magic value is 2)
|
||||
|
@ -75,13 +75,13 @@ records: [Record]</code></pre>
|
|||
<h5 class="anchor-heading"><a id="controlbatch" class="anchor-link"></a><a href="#controlbatch">5.3.1.1 Control Batches</a></h5>
|
||||
<p>A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.</p>
|
||||
<p> The key of a control record conforms to the following schema: </p>
|
||||
<pre class="line-numbers"><code class="language-text">version: int16 (current version is 0)
|
||||
<pre><code class="language-text">version: int16 (current version is 0)
|
||||
type: int16 (0 indicates an abort marker, 1 indicates a commit)</code></pre>
|
||||
<p>The schema for the value of a control record is dependent on the type. The value is opaque to clients.</p>
|
||||
|
||||
<h4 class="anchor-heading"><a id="record" class="anchor-link"></a><a href="#record">5.3.2 Record</a></h4>
|
||||
<p>Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below. </p>
|
||||
<pre class="line-numbers"><code class="language-text">length: varint
|
||||
<pre><code class="language-text">length: varint
|
||||
attributes: int8
|
||||
bit 0~7: unused
|
||||
timestampDelta: varlong
|
||||
|
@ -92,7 +92,7 @@ valueLen: varint
|
|||
value: byte[]
|
||||
Headers => [Header]</code></pre>
|
||||
<h5 class="anchor-heading"><a id="recordheader" class="anchor-link"></a><a href="#recordheader">5.3.2.1 Record Header</a></h5>
|
||||
<pre class="line-numbers"><code class="language-text">headerKeyLength: varint
|
||||
<pre><code class="language-text">headerKeyLength: varint
|
||||
headerKey: String
|
||||
headerValueLength: varint
|
||||
Value: byte[]</code></pre>
|
||||
|
@ -106,7 +106,7 @@ Value: byte[]</code></pre>
|
|||
</p>
|
||||
|
||||
<b>Message Set:</b><br>
|
||||
<pre class="line-numbers"><code class="language-text">MessageSet (Version: 0) => [offset message_size message]
|
||||
<pre><code class="language-text">MessageSet (Version: 0) => [offset message_size message]
|
||||
offset => INT64
|
||||
message_size => INT32
|
||||
message => crc magic_byte attributes key value
|
||||
|
@ -120,7 +120,7 @@ message => crc magic_byte attributes key value
|
|||
bit 3~7: unused
|
||||
key => BYTES
|
||||
value => BYTES</code></pre>
|
||||
<pre class="line-numbers"><code class="language-text">MessageSet (Version: 1) => [offset message_size message]
|
||||
<pre><code class="language-text">MessageSet (Version: 1) => [offset message_size message]
|
||||
offset => INT64
|
||||
message_size => INT32
|
||||
message => crc magic_byte attributes timestamp key value
|
||||
|
@ -190,7 +190,7 @@ message => crc magic_byte attributes timestamp key value
|
|||
|
||||
<p> The following is the format of the results sent to the consumer.
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">MessageSetSend (fetch result)
|
||||
<pre><code class="language-text">MessageSetSend (fetch result)
|
||||
|
||||
total length : 4 bytes
|
||||
error code : 2 bytes
|
||||
|
@ -198,7 +198,7 @@ message 1 : x bytes
|
|||
...
|
||||
message n : x bytes</code></pre>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">MultiMessageSetSend (multiFetch result)
|
||||
<pre><code class="language-text">MultiMessageSetSend (multiFetch result)
|
||||
|
||||
total length : 4 bytes
|
||||
error code : 2 bytes
|
||||
|
@ -264,7 +264,7 @@ messageSetSend n</code></pre>
|
|||
</p>
|
||||
|
||||
<h4 class="anchor-heading"><a id="impl_zkbroker" class="anchor-link"></a><a href="#impl_zkbroker">Broker Node Registry</a></h4>
|
||||
<pre class="line-numbers"><code class="language-json">/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)</code></pre>
|
||||
<pre><code class="language-json">/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)</code></pre>
|
||||
<p>
|
||||
This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) results in an error.
|
||||
</p>
|
||||
|
@ -272,7 +272,7 @@ messageSetSend n</code></pre>
|
|||
Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
|
||||
</p>
|
||||
<h4 class="anchor-heading"><a id="impl_zktopic" class="anchor-link"></a><a href="#impl_zktopic">Broker Topic Registry</a></h4>
|
||||
<pre class="line-numbers"><code class="language-json">/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)</code></pre>
|
||||
<pre><code class="language-json">/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)</code></pre>
|
||||
|
||||
<p>
|
||||
Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.
|
||||
|
|
382
docs/ops.html
382
docs/ops.html
File diff suppressed because it is too large
Load Diff
|
@ -182,7 +182,7 @@ Kafka request. SASL/GSSAPI authentication is performed starting with this packet
|
|||
|
||||
<p>All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">RequestOrResponse => Size (RequestMessage | ResponseMessage)
|
||||
<pre><code class="language-text">RequestOrResponse => Size (RequestMessage | ResponseMessage)
|
||||
Size => int32</code></pre>
|
||||
|
||||
<table class="data-table"><tbody>
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
the latest Kafka release and extract it:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
<pre><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
|
||||
</div>
|
||||
|
||||
|
@ -42,83 +42,54 @@ $ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
|
|||
<a href="#quickstart_startserver">Step 2: Start the Kafka environment</a>
|
||||
</h4>
|
||||
|
||||
<p class="note">
|
||||
NOTE: Your local environment must have Java 8+ installed.
|
||||
</p>
|
||||
<p class="note">NOTE: Your local environment must have Java 8+ installed.</p>
|
||||
|
||||
<p>
|
||||
Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one of the sections below but not both.
|
||||
</p>
|
||||
<p>Apache Kafka can be started using KRaft or ZooKeeper. To get started with either configuration follow one of the sections below but not both.</p>
|
||||
|
||||
<h5>
|
||||
Kafka with ZooKeeper
|
||||
</h5>
|
||||
|
||||
<p>
|
||||
Run the following commands in order to start all services in the correct order:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash"># Start the ZooKeeper service
|
||||
$ bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Open another terminal session and run:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash"># Start the Kafka broker service
|
||||
$ bin/kafka-server-start.sh config/server.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
|
||||
</p>
|
||||
|
||||
<h5>
|
||||
Kafka with KRaft
|
||||
</h5>
|
||||
<h5>Kafka with KRaft</h5>
|
||||
|
||||
<p>Kafka can be run using KRaft mode using local scripts and downloaded files or the docker image. Follow one of the sections below but not both to start the kafka server.</p>
|
||||
|
||||
<h5>Using downloaded files</h5>
|
||||
|
||||
<p>
|
||||
Generate a Cluster UUID
|
||||
</p>
|
||||
<p>Generate a Cluster UUID</p>
|
||||
<pre><code class="language-bash">$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"</code></pre>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"</code></pre>
|
||||
<p>Format Log Directories</p>
|
||||
<pre><code class="language-bash">$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Format Log Directories
|
||||
</p>
|
||||
<p>Start the Kafka Server</p>
|
||||
<pre><code class="language-bash">$ bin/kafka-server-start.sh config/kraft/server.properties</code></pre>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties</code></pre>
|
||||
<h5>Kafka with ZooKeeper</h5>
|
||||
|
||||
<p>
|
||||
Start the Kafka Server
|
||||
</p>
|
||||
<p>Run the following commands in order to start all services in the correct order:</p>
|
||||
<pre><code class="language-bash"># Start the ZooKeeper service
|
||||
$ bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-server-start.sh config/kraft/server.properties</code></pre>
|
||||
<p>Open another terminal session and run:</p>
|
||||
<pre><code class="language-bash"># Start the Kafka broker service
|
||||
$ bin/kafka-server-start.sh config/server.properties</code></pre>
|
||||
|
||||
<p>Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.</p>
|
||||
|
||||
<h5>Using JVM Based Apache Kafka Docker Image</h5>
|
||||
|
||||
<ul>
|
||||
<li> Get the docker image: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
<li> Start the kafka docker container: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
</ul>
|
||||
<p> Get the Docker image:</p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
|
||||
<p> Start the Kafka Docker container: </p>
|
||||
<pre><code class="language-bash">$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}</code></pre>
|
||||
|
||||
<h5>Using GraalVM Based Native Apache Kafka Docker Image</h5>
|
||||
|
||||
<ul>
|
||||
<li> Get the docker image: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker pull apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
<li> Start the kafka docker container: </li>
|
||||
<pre class="line-numbers"><code class="language-bash">$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
</ul>
|
||||
<p>Get the Docker image:</p>
|
||||
<pre><code class="language-bash">$ docker pull apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
|
||||
<p>
|
||||
Once the Kafka server has successfully launched, you will have a basic Kafka environment running and ready to use.
|
||||
</p>
|
||||
<p>Start the Kafka Docker container:</p>
|
||||
<pre><code class="language-bash">$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}</code></pre>
|
||||
|
||||
<p>Once the Kafka server has successfully launched, you will have a basic Kafka environment running and ready to use.</p>
|
||||
</div>
|
||||
|
||||
<div class="quickstart-step">
|
||||
|
@ -145,7 +116,7 @@ $ bin/kafka-server-start.sh config/server.properties</code></pre>
|
|||
So before you can write your first events, you must create a topic. Open another terminal session and run:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092</code></pre>
|
||||
|
||||
<p>
|
||||
All of Kafka's command line tools have additional options: run the <code>kafka-topics.sh</code> command without any
|
||||
|
@ -154,7 +125,7 @@ $ bin/kafka-server-start.sh config/server.properties</code></pre>
|
|||
of the new topic:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
|
||||
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:
|
||||
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0</code></pre>
|
||||
</div>
|
||||
|
@ -176,13 +147,11 @@ Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0</code></
|
|||
By default, each line you enter will result in a separate event being written to the topic.
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
|
||||
>This is my first event
|
||||
>This is my second event</code></pre>
|
||||
|
||||
<p>
|
||||
You can stop the producer client with <code>Ctrl-C</code> at any time.
|
||||
</p>
|
||||
<p>You can stop the producer client with <code>Ctrl-C</code> at any time.</p>
|
||||
</div>
|
||||
|
||||
<div class="quickstart-step">
|
||||
|
@ -193,7 +162,7 @@ Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0</code></
|
|||
|
||||
<p>Open another terminal session and run the console consumer client to read the events you just created:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
|
||||
This is my first event
|
||||
This is my second event</code></pre>
|
||||
|
||||
|
@ -237,15 +206,15 @@ This is my second event</code></pre>
|
|||
Edit the <code class="language-bash">config/connect-standalone.properties</code> file, add or change the <code>plugin.path</code> configuration property match the following, and save the file:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar >> config/connect-standalone.properties"</code></pre>
|
||||
<pre><code class="language-bash">$ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" >> config/connect-standalone.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Then, start by creating some seed data to test with:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ echo -e "foo\nbar" > test.txt</code></pre>
|
||||
<pre><code class="language-bash">$ echo -e "foo\nbar" > test.txt</code></pre>
|
||||
Or on Windows:
|
||||
<pre class="line-numbers"><code class="language-bash">$ echo foo> test.txt
|
||||
<pre><code class="language-bash">$ echo foo > test.txt
|
||||
$ echo bar >> test.txt</code></pre>
|
||||
|
||||
<p>
|
||||
|
@ -256,7 +225,7 @@ $ echo bar>> test.txt</code></pre>
|
|||
class to instantiate, and any other configuration required by the connector.
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties</code></pre>
|
||||
|
||||
<p>
|
||||
These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier
|
||||
|
@ -273,7 +242,7 @@ $ echo bar>> test.txt</code></pre>
|
|||
</p>
|
||||
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ more test.sink.txt
|
||||
<pre><code class="language-bash">$ more test.sink.txt
|
||||
foo
|
||||
bar</code></pre>
|
||||
|
||||
|
@ -283,14 +252,14 @@ bar</code></pre>
|
|||
</p>
|
||||
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
|
||||
{"schema":{"type":"string","optional":false},"payload":"foo"}
|
||||
{"schema":{"type":"string","optional":false},"payload":"bar"}
|
||||
…</code></pre>
|
||||
|
||||
<p>The connectors continue to process data, so we can add data to the file and see it move through the pipeline:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ echo Another line>> test.txt</code></pre>
|
||||
<pre><code class="language-bash">$ echo "Another line" >> test.txt</code></pre>
|
||||
|
||||
<p>You should see the line appear in the console consumer output and in the sink file.</p>
|
||||
|
||||
|
@ -360,7 +329,7 @@ wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.L
|
|||
along the way, run the command:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs</code></pre>
|
||||
<pre><code class="language-bash">$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs</code></pre>
|
||||
|
||||
</div>
|
||||
|
||||
|
|
|
@ -50,13 +50,13 @@
|
|||
list of the listeners to enable. At least one listener must be defined on each server. The format
|
||||
of each listener defined in <code>listeners</code> is given below:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">{LISTENER_NAME}://{hostname}:{port}</code></pre>
|
||||
<pre><code class="language-text">{LISTENER_NAME}://{hostname}:{port}</code></pre>
|
||||
|
||||
<p>The <code>LISTENER_NAME</code> is usually a descriptive name which defines the purpose of
|
||||
the listener. For example, many configurations use a separate listener for client traffic,
|
||||
so they might refer to the corresponding listener as <code>CLIENT</code> in the configuration:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=CLIENT://localhost:9092</code></pre>
|
||||
<pre><code class="language-text">listeners=CLIENT://localhost:9092</code></pre>
|
||||
|
||||
<p>The security protocol of each listener is defined in a separate configuration:
|
||||
<code>listener.security.protocol.map</code>. The value is a comma-separated list
|
||||
|
@ -64,7 +64,7 @@
|
|||
configuration specifies that the <code>CLIENT</code> listener will use SSL while the
|
||||
<code>BROKER</code> listener will use plaintext.</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
|
||||
<pre><code class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
|
||||
|
||||
<p>Possible options (case-insensitive) for the security protocol are given below:</p>
|
||||
<ol>
|
||||
|
@ -82,7 +82,7 @@
|
|||
we could skip the definition of the <code>CLIENT</code> and <code>BROKER</code> listeners
|
||||
using the following definition:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093</code></pre>
|
||||
<pre><code class="language-text">listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093</code></pre>
|
||||
|
||||
<p>However, we recommend users to provide explicit names for the listeners since it
|
||||
makes the intended usage of each listener clearer.</p>
|
||||
|
@ -117,7 +117,7 @@
|
|||
any security properties that are needed to configure it. For example, we might
|
||||
use the following configuration on a standalone broker:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">process.roles=broker
|
||||
<pre><code class="language-text">process.roles=broker
|
||||
listeners=BROKER://localhost:9092
|
||||
inter.broker.listener.name=BROKER
|
||||
controller.quorum.voters=0@localhost:9093
|
||||
|
@ -134,7 +134,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
is similar. The only difference is that the controller listener must be included in
|
||||
<code>listeners</code>:</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">process.roles=broker,controller
|
||||
<pre><code class="language-text">process.roles=broker,controller
|
||||
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
|
||||
inter.broker.listener.name=BROKER
|
||||
controller.quorum.voters=0@localhost:9093
|
||||
|
@ -176,7 +176,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
The tool supports two different keystore formats, the Java specific jks format which has been deprecated by now, as well as PKCS12.
|
||||
PKCS12 is the default format as of Java version 9, to ensure this format is being used regardless of the Java version in use all following
|
||||
commands explicitly specify the PKCS12 format.
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12</code></pre>
|
||||
You need to specify two parameters in the above command:
|
||||
<ol>
|
||||
<li>keystorefile: the keystore file that stores the keys (and later the certificate) for this broker. The keystore file contains the private
|
||||
|
@ -192,7 +192,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
authentication purposes.<br>
|
||||
To generate certificate signing requests run the following command for all server keystores created so far.
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
|
||||
This command assumes that you want to add hostname information to the certificate, if this is not the case, you can omit the extension parameter <code>-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code>. Please see below for more information on this.
|
||||
|
||||
<h5>Host Name Verification</h5>
|
||||
|
@ -205,7 +205,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
Server host name verification may be disabled by setting <code>ssl.endpoint.identification.algorithm</code> to an empty string.<br>
|
||||
For dynamically configured broker listeners, hostname verification may be disabled using <code>kafka-configs.sh</code>:<br>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="</code></pre>
|
||||
|
||||
<p><b>Note:</b></p>
|
||||
Normally there is no good reason to disable hostname verification apart from being the quickest way to "just get it to work" followed
|
||||
|
@ -228,7 +228,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
|
||||
|
||||
To add a SAN field append the following argument <code> -ext SAN=DNS:{FQDN},IP:{IPADDRESS}</code> to the keytool command:
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
|
||||
</li>
|
||||
|
||||
<li><h4 class="anchor-heading"><a id="security_ssl_ca" class="anchor-link"></a><a href="#security_ssl_ca">Creating your own CA</a></h4>
|
||||
|
@ -252,7 +252,7 @@ listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>
|
|||
CA keypair.<br>
|
||||
|
||||
Save the following listing into a file called openssl-ca.cnf and adjust the values for validity and common attributes as necessary.
|
||||
<pre class="line-numbers"><code class="language-bash">HOME = .
|
||||
<pre><code class="language-bash">HOME = .
|
||||
RANDFILE = $ENV::HOME/.rnd
|
||||
|
||||
####################################################################
|
||||
|
@ -337,25 +337,25 @@ keyUsage = digitalSignature, keyEncipherment</code></pre>
|
|||
Then create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of
|
||||
these are simply text files that reside in the same directory as your CA keys.
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> echo 01 > serial.txt
|
||||
> touch index.txt</code></pre>
|
||||
<pre><code class="language-bash">$ echo 01 > serial.txt
|
||||
$ touch index.txt</code></pre>
|
||||
|
||||
With these steps done you are now ready to generate your CA that will be used to sign certificates later.
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM</code></pre>
|
||||
<pre><code class="language-bash">$ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM</code></pre>
|
||||
|
||||
The CA is simply a public/private key pair and certificate that is signed by itself, and is only intended to sign other certificates.<br>
|
||||
This keypair should be kept very safe, if someone gains access to it, they can create and sign certificates that will be trusted by your
|
||||
infrastructure, which means they will be able to impersonate anybody when connecting to any service that trusts this CA.<br>
|
||||
|
||||
The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA:
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
|
||||
|
||||
<b>Note:</b>
|
||||
If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" in the
|
||||
<a href="#brokerconfigs">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have
|
||||
all the CA certificates that clients' keys were signed by.
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
|
||||
|
||||
In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates
|
||||
that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that
|
||||
|
@ -366,11 +366,11 @@ keyUsage = digitalSignature, keyEncipherment</code></pre>
|
|||
</li>
|
||||
<li><h4 class="anchor-heading"><a id="security_ssl_signing" class="anchor-link"></a><a href="#security_ssl_signing">Signing the certificate</a></h4>
|
||||
Then sign it with the CA:
|
||||
<pre class="line-numbers"><code class="language-bash">> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}</code></pre>
|
||||
<pre><code class="language-bash">$ openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}</code></pre>
|
||||
|
||||
Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
|
||||
<pre class="line-numbers"><code class="language-bash">> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
|
||||
> keytool -keystore {keystore} -alias localhost -import -file cert-signed</code></pre>
|
||||
<pre><code class="language-bash">$ keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
|
||||
$ keytool -keystore {keystore} -alias localhost -import -file cert-signed</code></pre>
|
||||
|
||||
The definitions of the parameters are the following:
|
||||
<ol>
|
||||
|
@ -439,7 +439,7 @@ keyUsage = digitalSignature, keyEncipherment</code></pre>
|
|||
harder for a malicious party to obtain certificates with potentially misleading or fraudulent values.
|
||||
It is advisable to double check signed certificates, whether these contain all requested SAN fields to enable proper hostname verification.
|
||||
The following command can be used to print certificate details to the console, which should be compared with what was originally requested:
|
||||
<pre class="line-numbers"><code class="language-bash">> openssl x509 -in certificate.crt -text -noout</code></pre>
|
||||
<pre><code class="language-bash">$ openssl x509 -in certificate.crt -text -noout</code></pre>
|
||||
</li>
|
||||
</ol>
|
||||
</li>
|
||||
|
@ -447,10 +447,10 @@ keyUsage = digitalSignature, keyEncipherment</code></pre>
|
|||
<li><h4 class="anchor-heading"><a id="security_configbroker" class="anchor-link"></a><a href="#security_configbroker">Configuring Kafka Brokers</a></h4>
|
||||
|
||||
If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
|
||||
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://host.name:port,SSL://host.name:port</code></pre>
|
||||
<pre><code class="language-text">listeners=PLAINTEXT://host.name:port,SSL://host.name:port</code></pre>
|
||||
|
||||
Following SSL configs are needed on the broker side
|
||||
<pre class="line-numbers"><code class="language-text">ssl.keystore.location=/var/private/ssl/server.keystore.jks
|
||||
<pre><code class="language-text">ssl.keystore.location=/var/private/ssl/server.keystore.jks
|
||||
ssl.keystore.password=test1234
|
||||
ssl.key.password=test1234
|
||||
ssl.truststore.location=/var/private/ssl/server.truststore.jks
|
||||
|
@ -468,7 +468,7 @@ ssl.truststore.password=test1234</code></pre>
|
|||
<li>ssl.secure.random.implementation=SHA1PRNG</li>
|
||||
</ol>
|
||||
If you want to enable SSL for inter-broker communication, add the following to the server.properties file (it defaults to PLAINTEXT)
|
||||
<pre class="line-numbers"><code class="language-text">security.inter.broker.protocol=SSL</code></pre>
|
||||
<pre><code class="language-text">security.inter.broker.protocol=SSL</code></pre>
|
||||
|
||||
<p>
|
||||
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the <a href="https://www.oracle.com/technetwork/java/javase/downloads/index.html">JCE Unlimited Strength Jurisdiction Policy Files</a> must be obtained and installed in the JDK/JRE. See the
|
||||
|
@ -484,12 +484,12 @@ ssl.truststore.password=test1234</code></pre>
|
|||
</p>
|
||||
|
||||
Once you start the broker you should be able to see in the server.log
|
||||
<pre class="line-numbers"><code class="language-text">with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</code></pre>
|
||||
<pre><code class="language-text">with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</code></pre>
|
||||
|
||||
To check quickly if the server keystore and truststore are setup properly you can run the following command
|
||||
<pre class="line-numbers"><code class="language-bash">> openssl s_client -debug -connect localhost:9093 -tls1</code></pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
|
||||
<pre><code class="language-bash">$ openssl s_client -debug -connect localhost:9093 -tls1</code></pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
|
||||
In the output of this command you should see server's certificate:
|
||||
<pre class="line-numbers"><code class="language-text">-----BEGIN CERTIFICATE-----
|
||||
<pre><code class="language-text">-----BEGIN CERTIFICATE-----
|
||||
{variable sized random bytes}
|
||||
-----END CERTIFICATE-----
|
||||
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
|
||||
|
@ -499,14 +499,14 @@ issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.co
|
|||
<li><h4 class="anchor-heading"><a id="security_configclients" class="anchor-link"></a><a href="#security_configclients">Configuring Kafka Clients</a></h4>
|
||||
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
|
||||
If client authentication is not required in the broker, then the following is a minimal configuration example:
|
||||
<pre class="line-numbers"><code class="language-text">security.protocol=SSL
|
||||
<pre><code class="language-text">security.protocol=SSL
|
||||
ssl.truststore.location=/var/private/ssl/client.truststore.jks
|
||||
ssl.truststore.password=test1234</code></pre>
|
||||
|
||||
Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
|
||||
|
||||
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
|
||||
<pre class="line-numbers"><code class="language-text">ssl.keystore.location=/var/private/ssl/client.keystore.jks
|
||||
<pre><code class="language-text">ssl.keystore.location=/var/private/ssl/client.keystore.jks
|
||||
ssl.keystore.password=test1234
|
||||
ssl.key.password=test1234</code></pre>
|
||||
|
||||
|
@ -520,8 +520,8 @@ ssl.key.password=test1234</code></pre>
|
|||
</ol>
|
||||
<br>
|
||||
Examples using console-producer and console-consumer:
|
||||
<pre class="line-numbers"><code class="language-bash">> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
|
||||
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
|
||||
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre>
|
||||
</li>
|
||||
</ol>
|
||||
<h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.4 Authentication using SASL</a></h3>
|
||||
|
@ -561,7 +561,7 @@ ssl.key.password=test1234</code></pre>
|
|||
login module may be specified in the config value. If multiple mechanisms are configured on a
|
||||
listener, configs must be provided for each mechanism using the listener and mechanism prefix.
|
||||
For example,</p>
|
||||
<pre class="line-numbers"><code class="language-text">listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
<pre><code class="language-text">listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
username="admin" \
|
||||
password="admin-secret";
|
||||
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
||||
|
@ -617,7 +617,7 @@ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.p
|
|||
<a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a>.
|
||||
For example, <a href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
|
||||
credentials may be configured as:
|
||||
<pre class="line-numbers"><code class="language-text">KafkaClient {
|
||||
<pre><code class="language-text">KafkaClient {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
|
@ -626,7 +626,7 @@ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.p
|
|||
};</code></pre>
|
||||
</li>
|
||||
<li>Pass the JAAS config file location as JVM parameter to each client JVM. For example:
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</code></pre></li>
|
||||
<pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</code></pre></li>
|
||||
</ol>
|
||||
</li>
|
||||
</ol>
|
||||
|
@ -695,14 +695,14 @@ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.p
|
|||
<li><b>Create Kerberos Principals</b><br>
|
||||
If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).<br>
|
||||
If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
|
||||
<pre class="line-numbers"><code class="language-bash">> sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
|
||||
> sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</code></pre></li>
|
||||
<pre><code class="language-bash">$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
|
||||
$ sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</code></pre></li>
|
||||
<li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
|
||||
</ol>
|
||||
<li><h5 class="anchor-heading"><a id="security_sasl_kerberos_brokerconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
|
||||
<ol>
|
||||
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
|
||||
<pre class="line-numbers"><code class="language-text">KafkaServer {
|
||||
<pre><code class="language-text">KafkaServer {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
|
@ -723,17 +723,17 @@ Client {
|
|||
allows the broker to login using the keytab specified in this section. See <a href="#security_jaas_broker">notes</a> for more details on Zookeeper SASL configuration.
|
||||
</li>
|
||||
<li>Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf
|
||||
<pre><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf
|
||||
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre>
|
||||
</li>
|
||||
<li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
|
||||
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port
|
||||
<pre><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port
|
||||
security.inter.broker.protocol=SASL_PLAINTEXT
|
||||
sasl.mechanism.inter.broker.protocol=GSSAPI
|
||||
sasl.enabled.mechanisms=GSSAPI</code></pre>
|
||||
We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.kerberos.service.name=kafka</code></pre>
|
||||
<pre><code class="language-text">sasl.kerberos.service.name=kafka</code></pre>
|
||||
</li>
|
||||
</ol></li>
|
||||
<li><h5 class="anchor-heading"><a id="security_sasl_kerberos_clientconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_clientconfig">Configuring Kafka Clients</a></h5>
|
||||
|
@ -747,7 +747,7 @@ sasl.enabled.mechanisms=GSSAPI</code></pre>
|
|||
The property <code>sasl.jaas.config</code> in producer.properties or consumer.properties describes
|
||||
how clients like producer and consumer can connect to the Kafka Broker. The following is an example
|
||||
configuration for a client using a keytab (recommended for long-running processes):
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
|
||||
useKeyTab=true \
|
||||
storeKey=true \
|
||||
keyTab="/etc/security/keytabs/kafka_client.keytab" \
|
||||
|
@ -755,7 +755,7 @@ sasl.enabled.mechanisms=GSSAPI</code></pre>
|
|||
|
||||
For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used
|
||||
along with "useTicketCache=true" as in:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
|
||||
useTicketCache=true;</code></pre>
|
||||
|
||||
JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
|
||||
|
@ -763,9 +763,9 @@ sasl.enabled.mechanisms=GSSAPI</code></pre>
|
|||
<code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</li>
|
||||
<li>Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.</li>
|
||||
<li>Optionally pass the krb5 file locations as JVM parameters to each client JVM (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf</code></pre></li>
|
||||
<pre><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf</code></pre></li>
|
||||
<li>Configure the following properties in producer.properties or consumer.properties:
|
||||
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_PLAINTEXT (or SASL_SSL)
|
||||
<pre><code class="language-text">security.protocol=SASL_PLAINTEXT (or SASL_SSL)
|
||||
sasl.mechanism=GSSAPI
|
||||
sasl.kerberos.service.name=kafka</code></pre></li>
|
||||
</ol>
|
||||
|
@ -781,7 +781,7 @@ sasl.kerberos.service.name=kafka</code></pre></li>
|
|||
<li><h5 class="anchor-heading"><a id="security_sasl_plain_brokerconfig" class="anchor-link"></a><a href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
|
||||
<ol>
|
||||
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
|
||||
<pre class="line-numbers"><code class="language-text">KafkaServer {
|
||||
<pre><code class="language-text">KafkaServer {
|
||||
org.apache.kafka.common.security.plain.PlainLoginModule required
|
||||
username="admin"
|
||||
password="admin-secret"
|
||||
|
@ -794,9 +794,9 @@ sasl.kerberos.service.name=kafka</code></pre></li>
|
|||
the passwords for all users that connect to the broker and the broker validates all client connections including
|
||||
those from other brokers using these properties.</li>
|
||||
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port
|
||||
<pre><code class="language-text">listeners=SASL_SSL://host.name:port
|
||||
security.inter.broker.protocol=SASL_SSL
|
||||
sasl.mechanism.inter.broker.protocol=PLAIN
|
||||
sasl.enabled.mechanisms=PLAIN</code></pre></li>
|
||||
|
@ -809,7 +809,7 @@ sasl.enabled.mechanisms=PLAIN</code></pre></li>
|
|||
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
|
||||
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
|
||||
The following is an example configuration for a client for the PLAIN mechanism:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
||||
username="alice" \
|
||||
password="alice-secret";</code></pre>
|
||||
<p>The options <code>username</code> and <code>password</code> are used by clients to configure
|
||||
|
@ -821,7 +821,7 @@ sasl.enabled.mechanisms=PLAIN</code></pre></li>
|
|||
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
|
||||
<code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
|
||||
<li>Configure the following properties in producer.properties or consumer.properties:
|
||||
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL
|
||||
<pre><code class="language-text">security.protocol=SASL_SSL
|
||||
sasl.mechanism=PLAIN</code></pre></li>
|
||||
</ol>
|
||||
</li>
|
||||
|
@ -858,21 +858,21 @@ sasl.mechanism=PLAIN</code></pre></li>
|
|||
before Kafka brokers are started. Client credentials may be created and updated dynamically and updated
|
||||
credentials will be used to authenticate new connections.</p>
|
||||
<p>Create SCRAM credentials for user <i>alice</i> with password <i>alice-secret</i>:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice</code></pre>
|
||||
<p>The default iteration count of 4096 is used if iterations are not specified. A random salt is created
|
||||
and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper.
|
||||
See <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a> for details on SCRAM identity and the individual fields.
|
||||
<p>The following examples also require a user <i>admin</i> for inter-broker communication which can be created using:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin</code></pre>
|
||||
<p>Existing credentials may be listed using the <i>--describe</i> option:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice</code></pre>
|
||||
<p>Credentials may be deleted for one or more SCRAM mechanisms using the <i>--alter --delete-config</i> option:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice</code></pre>
|
||||
</li>
|
||||
<li><h5 class="anchor-heading"><a id="security_sasl_scram_brokerconfig" class="anchor-link"></a><a href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
|
||||
<ol>
|
||||
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
|
||||
<pre class="line-numbers"><code class="language-text">KafkaServer {
|
||||
<pre><code class="language-text">KafkaServer {
|
||||
org.apache.kafka.common.security.scram.ScramLoginModule required
|
||||
username="admin"
|
||||
password="admin-secret";
|
||||
|
@ -881,9 +881,9 @@ sasl.mechanism=PLAIN</code></pre></li>
|
|||
the broker to initiate connections to other brokers. In this example, <i>admin</i> is the user for
|
||||
inter-broker communication.</li>
|
||||
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port
|
||||
<pre><code class="language-text">listeners=SASL_SSL://host.name:port
|
||||
security.inter.broker.protocol=SASL_SSL
|
||||
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
|
||||
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
||||
|
@ -896,7 +896,7 @@ sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
|||
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
|
||||
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
|
||||
The following is an example configuration for a client for the SCRAM mechanisms:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
username="alice" \
|
||||
password="alice-secret";</code></pre>
|
||||
|
||||
|
@ -909,7 +909,7 @@ sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
|||
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
|
||||
<code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
|
||||
<li>Configure the following properties in producer.properties or consumer.properties:
|
||||
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL
|
||||
<pre><code class="language-text">security.protocol=SASL_SSL
|
||||
sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
||||
</ol>
|
||||
</li>
|
||||
|
@ -944,7 +944,7 @@ sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
|||
<li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_brokerconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
|
||||
<ol>
|
||||
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
|
||||
<pre class="line-numbers"><code class="language-text">KafkaServer {
|
||||
<pre><code class="language-text">KafkaServer {
|
||||
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
|
||||
unsecuredLoginStringClaim_sub="admin";
|
||||
};</code></pre>
|
||||
|
@ -952,9 +952,9 @@ sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
|
|||
the broker when it initiates connections to other brokers. In this example, <i>admin</i> will appear in the
|
||||
subject (<code>sub</code>) claim and will be the user for inter-broker communication.</li>
|
||||
<li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
|
||||
<pre class="line-numbers"><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
|
||||
<li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
|
||||
<pre><code class="language-text">listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
|
||||
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
|
||||
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
|
||||
sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
|
||||
|
@ -967,7 +967,7 @@ sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
|
|||
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
|
||||
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
|
||||
The following is an example configuration for a client for the OAUTHBEARER mechanisms:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
|
||||
unsecuredLoginStringClaim_sub="alice";</code></pre>
|
||||
|
||||
<p>The option <code>unsecuredLoginStringClaim_sub</code> is used by clients to configure
|
||||
|
@ -980,7 +980,7 @@ sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
|
|||
as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
|
||||
<code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
|
||||
<li>Configure the following properties in producer.properties or consumer.properties:
|
||||
<pre class="line-numbers"><code class="language-text">security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
|
||||
<pre><code class="language-text">security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
|
||||
sasl.mechanism=OAUTHBEARER</code></pre></li>
|
||||
<li>The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library.
|
||||
Since it's an optional dependency, users have to configure it as a dependency via their build tool.</li>
|
||||
|
@ -1139,7 +1139,7 @@ sasl.mechanism=OAUTHBEARER</code></pre></li>
|
|||
<li><h4 class="anchor-heading"><a id="security_sasl_multimechanism" class="anchor-link"></a><a href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
|
||||
<ol>
|
||||
<li>Specify configuration for the login modules of all enabled mechanisms in the <code>KafkaServer</code> section of the JAAS config file. For example:
|
||||
<pre class="line-numbers"><code class="language-text">KafkaServer {
|
||||
<pre><code class="language-text">KafkaServer {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
|
@ -1152,9 +1152,9 @@ sasl.mechanism=OAUTHBEARER</code></pre></li>
|
|||
user_admin="admin-secret"
|
||||
user_alice="alice-secret";
|
||||
};</code></pre></li>
|
||||
<li>Enable the SASL mechanisms in server.properties: <pre class="line-numbers"><code class="language-text">sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</code></pre></li>
|
||||
<li>Enable the SASL mechanisms in server.properties: <pre><code class="language-text">sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</code></pre></li>
|
||||
<li>Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
|
||||
<pre class="line-numbers"><code class="language-text">security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
|
||||
<pre><code class="language-text">security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
|
||||
sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)</code></pre></li>
|
||||
<li>Follow the mechanism-specific steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
|
||||
<a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
|
||||
|
@ -1222,15 +1222,15 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
|
|||
To describe other tokens, a DESCRIBE_TOKEN permission needs to be added on the User resource representing the owner of the token.
|
||||
<code>kafka-delegation-tokens.sh</code> script examples are given below.</p>
|
||||
<p>Create a delegation token:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1</code></pre>
|
||||
<p>Create a delegation token for a different owner:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1</code></pre>
|
||||
<p>Renew a delegation token:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
|
||||
<p>Expire a delegation token:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
|
||||
<p>Existing tokens can be described using the --describe option:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1</code></pre>
|
||||
</li>
|
||||
<li><h5 class="anchor-heading"><a id="security_token_authentication" class="anchor-link"></a><a href="#security_token_authentication">Token Authentication</a></h5>
|
||||
<p>Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable
|
||||
|
@ -1241,7 +1241,7 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
|
|||
<li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
|
||||
The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
|
||||
The following is an example configuration for a client for the token authentication:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
<pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
||||
username="tokenID123" \
|
||||
password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
|
||||
tokenauth="true";</code></pre>
|
||||
|
@ -1278,9 +1278,9 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
|
|||
Kafka provides default implementations which store ACLs in the cluster metadata (either Zookeeper or the KRaft metadata log).
|
||||
|
||||
For Zookeeper-based clusters, the provided implementation is configured as follows:
|
||||
<pre class="line-numbers"><code class="language-text">authorizer.class.name=kafka.security.authorizer.AclAuthorizer</code></pre>
|
||||
<pre><code class="language-text">authorizer.class.name=kafka.security.authorizer.AclAuthorizer</code></pre>
|
||||
For KRaft clusters, use the following configuration on all nodes (brokers, controllers, or combined broker/controller nodes):
|
||||
<pre class="line-numbers"><code class="language-text">authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer</code></pre>
|
||||
<pre><code class="language-text">authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer</code></pre>
|
||||
|
||||
Kafka ACLs are defined in the general format of "Principal {P} is [Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching ResourcePattern {RP}".
|
||||
You can read more about the ACL structure in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface">KIP-11</a> and
|
||||
|
@ -1288,9 +1288,9 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
|
|||
In order to add, remove, or list ACLs, you can use the Kafka ACL CLI <code>kafka-acls.sh</code>. By default, if no ResourcePatterns match a specific Resource R,
|
||||
then R has no associated ACLs, and therefore no one other than super users is allowed to access R.
|
||||
If you want to change that behavior, you can include the following in server.properties.
|
||||
<pre class="line-numbers"><code class="language-text">allow.everyone.if.no.acl.found=true</code></pre>
|
||||
<pre><code class="language-text">allow.everyone.if.no.acl.found=true</code></pre>
|
||||
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
|
||||
<pre class="line-numbers"><code class="language-text">super.users=User:Bob;User:Alice</code></pre>
|
||||
<pre><code class="language-text">super.users=User:Bob;User:Alice</code></pre>
|
||||
|
||||
<h5 class="anchor-heading"><a id="kraft_principal_forwarding" class="anchor-link"></a><a href="#kraft_principal_forwarding">KRaft Principal Forwarding</a></h5>
|
||||
|
||||
|
@ -1314,11 +1314,11 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
|
|||
string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
|
||||
This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">RULE:pattern/replacement/
|
||||
<pre><code class="language-text">RULE:pattern/replacement/
|
||||
RULE:pattern/replacement/[LU]</code></pre>
|
||||
|
||||
Example <code>ssl.principal.mapping.rules</code> values are:
|
||||
<pre class="line-numbers"><code class="language-text">RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
|
||||
<pre><code class="language-text">RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
|
||||
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
|
||||
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
|
||||
DEFAULT</code></pre>
|
||||
|
@ -1327,14 +1327,14 @@ DEFAULT</code></pre>
|
|||
and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
|
||||
|
||||
<br>For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
|
||||
<pre class="line-numbers"><code class="language-text">principal.builder.class=CustomizedPrincipalBuilderClass</code></pre>
|
||||
<pre><code class="language-text">principal.builder.class=CustomizedPrincipalBuilderClass</code></pre>
|
||||
|
||||
<h5 class="anchor-heading"><a id="security_authz_sasl" class="anchor-link"></a><a href="#security_authz_sasl">Customizing SASL User Name</a></h5>
|
||||
|
||||
By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties.
|
||||
The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="https://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax.
|
||||
Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
|
||||
<pre class="line-numbers"><code class="language-text">RULE:[n:string](regexp)s/pattern/replacement/
|
||||
<pre><code class="language-text">RULE:[n:string](regexp)s/pattern/replacement/
|
||||
RULE:[n:string](regexp)s/pattern/replacement/g
|
||||
RULE:[n:string](regexp)s/pattern/replacement//L
|
||||
RULE:[n:string](regexp)s/pattern/replacement/g/L
|
||||
|
@ -1342,7 +1342,7 @@ RULE:[n:string](regexp)s/pattern/replacement//U
|
|||
RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
|
||||
|
||||
An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
|
||||
<pre class="line-numbers"><code class="language-text">sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</code></pre>
|
||||
<pre><code class="language-text">sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</code></pre>
|
||||
|
||||
<h4 class="anchor-heading"><a id="security_authz_cli" class="anchor-link"></a><a href="#security_authz_cli">Command Line Interface</a></h4>
|
||||
Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called <b>kafka-acls.sh</b>. Following lists all the options that the script supports:
|
||||
|
@ -1545,41 +1545,41 @@ RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
|
|||
<ul>
|
||||
<li><b>Adding Acls</b><br>
|
||||
Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</code></pre>
|
||||
By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</code></pre>
|
||||
Note that <code>--allow-host</code> and <code>--deny-host</code> only support IP addresses (hostnames are not supported).
|
||||
Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
|
||||
You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
|
||||
You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'</code></pre>
|
||||
You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host".
|
||||
You can do that by executing the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</code></pre>
|
||||
Note, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.</li>
|
||||
|
||||
<li><b>Removing Acls</b><br>
|
||||
Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </code></pre>
|
||||
If you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</code></pre></li>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</code></pre></li>
|
||||
|
||||
<li><b>List Acls</b><br>
|
||||
We can list acls for any resource by specifying the --list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic</code></pre>
|
||||
However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic,
|
||||
e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'</code></pre>
|
||||
However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known.
|
||||
We can list <i>all</i> acls affecting Test-topic by using '--resource-pattern-type match', e.g.
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic --resource-pattern-type match</code></pre>
|
||||
<pre><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic --resource-pattern-type match</code></pre>
|
||||
This will list acls on all matching literal, wildcard and prefixed resource patterns.</li>
|
||||
|
||||
<li><b>Adding or removing a principal as producer or consumer</b><br>
|
||||
The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --producer --topic Test-topic</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --producer --topic Test-topic</code></pre>
|
||||
Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </code></pre>
|
||||
Note that for consumer option we must also specify the consumer group.
|
||||
In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
|
||||
|
||||
|
@ -1587,10 +1587,10 @@ RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
|
|||
Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
|
||||
All the above examples can be executed by using <b>--bootstrap-server</b> option. For example:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
|
||||
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
|
||||
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
|
||||
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:tokenRequester --operation CreateTokens --user-principal "owner1"</code></pre></li>
|
||||
<pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
|
||||
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
|
||||
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
|
||||
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:tokenRequester --operation CreateTokens --user-principal "owner1"</code></pre></li>
|
||||
|
||||
</ul>
|
||||
|
||||
|
@ -2356,42 +2356,42 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminc
|
|||
When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.
|
||||
<p></p>
|
||||
As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, an SSL port is opened on each node:
|
||||
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</code></pre>
|
||||
<pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</code></pre>
|
||||
|
||||
We then restart the clients, changing their config to point at the newly opened, secured port:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">bootstrap.servers = [broker1:9092,...]
|
||||
<pre><code class="language-text">bootstrap.servers = [broker1:9092,...]
|
||||
security.protocol = SSL
|
||||
...etc</code></pre>
|
||||
|
||||
In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
|
||||
<pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
|
||||
security.inter.broker.protocol=SSL</code></pre>
|
||||
|
||||
In the final bounce we secure the cluster by closing the PLAINTEXT port:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SSL://broker1:9092
|
||||
<pre><code class="language-text">listeners=SSL://broker1:9092
|
||||
security.inter.broker.protocol=SSL</code></pre>
|
||||
|
||||
Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</code></pre>
|
||||
<pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</code></pre>
|
||||
|
||||
We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">bootstrap.servers = [broker1:9093,...]
|
||||
<pre><code class="language-text">bootstrap.servers = [broker1:9093,...]
|
||||
security.protocol = SASL_SSL
|
||||
...etc</code></pre>
|
||||
|
||||
The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
|
||||
<pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
|
||||
security.inter.broker.protocol=SSL</code></pre>
|
||||
|
||||
The final bounce secures the cluster by closing the PLAINTEXT port.
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
|
||||
<pre><code class="language-text">listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
|
||||
security.inter.broker.protocol=SSL</code></pre>
|
||||
|
||||
ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section <a href="#zk_authz_migration">7.7.2</a>.
|
||||
|
@ -2455,7 +2455,7 @@ security.inter.broker.protocol=SSL</code></pre>
|
|||
Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
|
||||
These configurations are described in the
|
||||
<a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
|
||||
<pre class="line-numbers"><code class="language-text">secureClientPort=2182
|
||||
<pre><code class="language-text">secureClientPort=2182
|
||||
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
|
||||
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
|
||||
ssl.keyStore.location=/path/to/zk/keystore.jks
|
||||
|
@ -2469,7 +2469,7 @@ ssl.trustStore.password=zk-ts-passwd</code></pre>
|
|||
<p>Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
|
||||
These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
|
||||
</p>
|
||||
<pre class="line-numbers"><code class="language-text"># connect to the ZooKeeper port configured for TLS
|
||||
<pre><code class="language-text"># connect to the ZooKeeper port configured for TLS
|
||||
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
|
||||
# required to use TLS to ZooKeeper (default is false)
|
||||
zookeeper.ssl.client.enable=true
|
||||
|
@ -2490,7 +2490,7 @@ zookeeper.set.acl=true</code></pre>
|
|||
If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
|
||||
<ol>
|
||||
<li>Enable SASL and/or mTLS authentication on ZooKeeper. If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
|
||||
<pre class="line-numbers"><code class="language-text">clientPort=2181
|
||||
<pre><code class="language-text">clientPort=2181
|
||||
secureClientPort=2182
|
||||
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
|
||||
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
|
||||
|
@ -2513,9 +2513,9 @@ ssl.trustStore.password=zk-ts-passwd</code></pre>
|
|||
<li>If you are disabling mTLS, disable the TLS port in ZooKeeper</li>
|
||||
</ol>
|
||||
Here is an example of how to run the migration tool:
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181</code></pre>
|
||||
<pre><code class="language-bash">$ bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181</code></pre>
|
||||
<p>Run this to see the full list of parameters:</p>
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/zookeeper-security-migration.sh --help</code></pre>
|
||||
<pre><code class="language-bash">$ bin/zookeeper-security-migration.sh --help</code></pre>
|
||||
<h4 class="anchor-heading"><a id="zk_authz_ensemble" class="anchor-link"></a><a href="#zk_authz_ensemble">7.7.3 Migrating the ZooKeeper ensemble</a></h4>
|
||||
It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail:
|
||||
<ol>
|
||||
|
@ -2533,7 +2533,7 @@ ssl.trustStore.password=zk-ts-passwd</code></pre>
|
|||
and setting this value to <code>none</code> in ZooKeeper allows clients to connect via a TLS-encrypted connection
|
||||
without presenting their own certificate. Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption.
|
||||
These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
|
||||
<pre class="line-numbers"><code class="language-text"># connect to the ZooKeeper port configured for TLS
|
||||
<pre><code class="language-text"># connect to the ZooKeeper port configured for TLS
|
||||
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
|
||||
# required to use TLS to ZooKeeper (default is false)
|
||||
zookeeper.ssl.client.enable=true
|
||||
|
|
|
@ -78,9 +78,9 @@
|
|||
<h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
|
||||
<p>Invoke the application reset tool from the command line</p>
|
||||
<p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <code class="docutils literal"><span class="pre">--dry-run</span></code> to preview your changes before making them.</p>
|
||||
<pre class="line-numbers"><code class="language-bash"><path-to-kafka>/bin/kafka-streams-application-reset</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-streams-application-reset</code></pre>
|
||||
<p>The tool accepts the following parameters:</p>
|
||||
<pre class="line-numbers"><code class="language-text">Option (* = required) Description
|
||||
<pre><code class="language-text">Option (* = required) Description
|
||||
--------------------- -----------
|
||||
* --application-id <String: id> The Kafka Streams application ID
|
||||
(application.id).
|
||||
|
|
|
@ -100,7 +100,7 @@ userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.L
|
|||
<h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3>
|
||||
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in
|
||||
its <code class="docutils literal"><span class="pre">kafka-clients</span></code> Maven artifact:</p>
|
||||
<pre class="line-numbers"><code class="language-xml-doc"><dependency>
|
||||
<pre class="line-numbers"><code class="language-xml"><dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.0</version>
|
||||
|
|
|
@ -81,7 +81,7 @@ stream.filter((k,v) -> !v.equals("invalid_txn"))
|
|||
<p>
|
||||
Running <code>Topology#describe()</code> yields this string:
|
||||
|
||||
<pre class="line-numbers"><code class="language-text">Topologies:
|
||||
<pre><code class="language-text">Topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
|
||||
--> KSTREAM-FILTER-0000000001
|
||||
|
|
|
@ -159,11 +159,10 @@ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);</code></pre>
|
|||
<code class="docutils literal"><span class="pre">rocksdb.config.setter</span></code> configuration.</p>
|
||||
<p>Also, we recommend changing RocksDB's default memory allocator, because the default allocator may lead to increased memory consumption.
|
||||
To change the memory allocator to <code>jemalloc</code>, you need to set the environment variable <code>LD_PRELOAD</code>before you start your Kafka Streams application:</p>
|
||||
<pre class="line-numbers"><code class="language-bash"># example: install jemalloc (on Debian)
|
||||
<pre><code class="language-bash"># example: install jemalloc (on Debian)
|
||||
$ apt install -y libjemalloc-dev
|
||||
# set LD_PRELOAD before you start your Kafka Streams application
|
||||
$ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so”
|
||||
</code></pre>
|
||||
$ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so"</code></pre>
|
||||
<p> As of 2.3.0 the memory usage across all instances can be bounded, limiting the total off-heap memory of your Kafka Streams application. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager">WriteBufferManager</a> and count its memory against the block cache, and then pass the same Cache object to each instance. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB">RocksDB Memory Usage</a> for details. An example RocksDBConfigSetter implementing this is shown below:</p>
|
||||
<pre class="line-numbers"><code class="language-java">public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@
|
|||
<div class="section" id="starting-a-kafka-streams-application">
|
||||
<span id="streams-developer-guide-execution-starting"></span><h2><a class="toc-backref" href="#id3">Starting a Kafka Streams application</a><a class="headerlink" href="#starting-a-kafka-streams-application" title="Permalink to this headline"></a></h2>
|
||||
<p>You can package your Java application as a fat JAR file and then start the application like this:</p>
|
||||
<pre class="line-numbers"><code class="language-bash"># Start the application in class `com.example.MyStreamsApp`
|
||||
<pre><code class="language-bash"># Start the application in class `com.example.MyStreamsApp`
|
||||
# from the fat JAR named `path-to-app-fatjar.jar`.
|
||||
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp</code></pre>
|
||||
<p>When you start your application you are launching a Kafka Streams instance of your application. You can run multiple
|
||||
|
|
|
@ -98,7 +98,7 @@
|
|||
then you must also include these SSL certificates in the correct locations within the Docker image.</p>
|
||||
<p>The snippet below shows the settings to enable client authentication and SSL encryption for data-in-transit between your
|
||||
Kafka Streams application and the Kafka cluster it is reading and writing from:</p>
|
||||
<pre class="line-numbers"><code class="language-bash"># Essential security settings to enable client authentication and SSL encryption
|
||||
<pre><code class="language-bash"># Essential security settings to enable client authentication and SSL encryption
|
||||
bootstrap.servers=kafka.example.com:9093
|
||||
security.protocol=SSL
|
||||
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
|
||||
|
|
|
@ -51,7 +51,7 @@
|
|||
To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
|
||||
dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
|
||||
</p>
|
||||
<pre class="line-numbers"><code class="language-text"><dependency>
|
||||
<pre class="line-numbers"><code class="language-xml"><dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams-test-utils</artifactId>
|
||||
<version>{{fullDotVersion}}</version>
|
||||
|
|
|
@ -91,8 +91,8 @@ because it cannot know when it has processed "all" the input data.
|
|||
<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
|
||||
Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
> cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
|
||||
<pre><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
|
||||
|
||||
<h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
|
||||
|
||||
|
@ -108,13 +108,13 @@ Note that there are multiple downloadable Scala versions and we choose to use th
|
|||
Run the following commands in order to start all services in the correct order:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Open another terminal session and run:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-server-start.sh config/server.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-server-start.sh config/server.properties</code></pre>
|
||||
|
||||
<h5>
|
||||
Kafka with KRaft
|
||||
|
@ -124,19 +124,19 @@ Note that there are multiple downloadable Scala versions and we choose to use th
|
|||
Generate a Cluster UUID
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"</code></pre>
|
||||
<pre><code class="language-bash">$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"</code></pre>
|
||||
|
||||
<p>
|
||||
Format Log Directories
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties</code></pre>
|
||||
|
||||
<p>
|
||||
Start the Kafka Server
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-server-start.sh config/kraft/server.properties</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-server-start.sh config/kraft/server.properties</code></pre>
|
||||
|
||||
<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
|
||||
|
||||
|
@ -152,7 +152,7 @@ Or on Windows:
|
|||
|
||||
Next, we create the input topic named <b>streams-plaintext-input</b> and the output topic named <b>streams-wordcount-output</b>:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-topics.sh --create \
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --create \
|
||||
--bootstrap-server localhost:9092 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
|
@ -162,7 +162,7 @@ Created topic "streams-plaintext-input".</code></pre>
|
|||
Note: we create the output topic with compaction enabled because the output stream is a changelog stream
|
||||
(cf. <a href="#anchor-changelog-output">explanation of application output</a> below).
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-topics.sh --create \
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --create \
|
||||
--bootstrap-server localhost:9092 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
|
@ -172,8 +172,7 @@ Created topic "streams-wordcount-output".</code></pre>
|
|||
|
||||
The created topic can be described with the same <b>kafka-topics</b> tool:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
|
||||
|
||||
<pre><code class="language-bash">$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
|
||||
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
|
||||
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
|
||||
|
@ -183,7 +182,7 @@ Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segme
|
|||
|
||||
The following command starts the WordCount demo application:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo</code></pre>
|
||||
|
||||
<p>
|
||||
The demo application will read from the input topic <b>streams-plaintext-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
|
||||
|
@ -193,11 +192,11 @@ Hence there won't be any STDOUT output except log entries as the results are wri
|
|||
|
||||
Now we can start the console producer in a separate terminal to write some input data to this topic:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input</code></pre>
|
||||
|
||||
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--property print.key=true \
|
||||
|
@ -212,14 +211,14 @@ Now let's write some message with the console producer into the input topic <b>s
|
|||
This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
|
||||
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
all streams lead to kafka</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
>all streams lead to kafka</code></pre>
|
||||
|
||||
<p>
|
||||
This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--property print.key=true \
|
||||
|
@ -241,13 +240,13 @@ Now let's continue writing one more message with the console producer into the i
|
|||
Enter the text line "hello kafka streams" and hit <RETURN>.
|
||||
Your terminal should look as follows:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
all streams lead to kafka
|
||||
hello kafka streams</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
>all streams lead to kafka
|
||||
>hello kafka streams</code></pre>
|
||||
|
||||
In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--property print.key=true \
|
||||
|
@ -269,15 +268,15 @@ Whenever you write further input messages to the input topic, you will observe n
|
|||
representing the most recent word counts as computed by the WordCount application.
|
||||
Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic <b>streams-plaintext-input</b> before we wrap up this quickstart:
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
all streams lead to kafka
|
||||
hello kafka streams
|
||||
join kafka summit</code></pre>
|
||||
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
|
||||
>all streams lead to kafka
|
||||
>hello kafka streams
|
||||
>join kafka summit</code></pre>
|
||||
|
||||
<a name="anchor-changelog-output"></a>
|
||||
The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines):
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
<pre><code class="language-bash">$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
|
||||
--topic streams-wordcount-output \
|
||||
--from-beginning \
|
||||
--property print.key=true \
|
||||
|
|
|
@ -42,7 +42,7 @@
|
|||
We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">mvn archetype:generate \
|
||||
<pre><code class="language-bash">$ mvn archetype:generate \
|
||||
-DarchetypeGroupId=org.apache.kafka \
|
||||
-DarchetypeArtifactId=streams-quickstart-java \
|
||||
-DarchetypeVersion={{fullDotVersion}} \
|
||||
|
@ -55,7 +55,7 @@
|
|||
Assuming the above parameter values are used, this command will create a project structure that looks like this:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> tree streams-quickstart
|
||||
<pre><code class="language-bash">$ tree streams-quickstart
|
||||
streams-quickstart
|
||||
|-- pom.xml
|
||||
|-- src
|
||||
|
@ -78,8 +78,8 @@
|
|||
Since we are going to start writing such programs from scratch, we can now delete these examples:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> cd streams-quickstart
|
||||
> rm src/main/java/myapps/*.java</code></pre>
|
||||
<pre><code class="language-bash">$ cd streams-quickstart
|
||||
$ rm src/main/java/myapps/*.java</code></pre>
|
||||
|
||||
<h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
|
||||
|
||||
|
@ -165,8 +165,8 @@ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getCla
|
|||
If we just stop here, compile and run the program, it will output the following information:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.Pipe
|
||||
<pre><code class="language-bash">$ mvn clean package
|
||||
$ mvn exec:java -Dexec.mainClass=myapps.Pipe
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
|
||||
|
@ -273,8 +273,8 @@ public class Pipe {
|
|||
you can run this code in your IDE or on the command line, using Maven:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-brush">> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.Pipe</code></pre>
|
||||
<pre><code class="language-bash">$ mvn clean package
|
||||
$ mvn exec:java -Dexec.mainClass=myapps.Pipe</code></pre>
|
||||
|
||||
<p>
|
||||
For detailed instructions on how to run a Streams application and observe its computing results,
|
||||
|
@ -290,7 +290,7 @@ public class Pipe {
|
|||
We can first create another program by first copy the existing <code>Pipe.java</code> class:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-brush">> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java</code></pre>
|
||||
<pre><code class="language-bash">$ cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java</code></pre>
|
||||
|
||||
<p>
|
||||
And change its class name as well as the application id config to distinguish with the original program:
|
||||
|
@ -342,8 +342,8 @@ source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
|
|||
If we now describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.LineSplit
|
||||
<pre><code class="language-bash">$ mvn clean package
|
||||
$ mvn exec:java -Dexec.mainClass=myapps.LineSplit
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
|
||||
|
@ -482,8 +482,8 @@ source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault(
|
|||
If we again describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">> mvn clean package
|
||||
> mvn exec:java -Dexec.mainClass=myapps.WordCount
|
||||
<pre><code class="language-bash">$ mvn clean package
|
||||
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
|
||||
Sub-topologies:
|
||||
Sub-topology: 0
|
||||
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
|
||||
|
|
|
@ -86,7 +86,7 @@
|
|||
</li>
|
||||
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
|
||||
<code>
|
||||
./bin/kafka-features.sh upgrade --metadata 3.7
|
||||
bin/kafka-features.sh upgrade --metadata 3.7
|
||||
</code>
|
||||
</li>
|
||||
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
|
||||
|
@ -192,7 +192,7 @@
|
|||
</li>
|
||||
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
|
||||
<code>
|
||||
./bin/kafka-features.sh upgrade --metadata 3.6
|
||||
bin/kafka-features.sh upgrade --metadata 3.6
|
||||
</code>
|
||||
</li>
|
||||
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
|
||||
|
@ -334,7 +334,7 @@
|
|||
</li>
|
||||
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
|
||||
<code>
|
||||
./bin/kafka-features.sh upgrade --metadata 3.5
|
||||
bin/kafka-features.sh upgrade --metadata 3.5
|
||||
</code>
|
||||
</li>
|
||||
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
|
||||
|
@ -433,7 +433,7 @@
|
|||
</li>
|
||||
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
|
||||
<code>
|
||||
./bin/kafka-features.sh upgrade --metadata 3.4
|
||||
bin/kafka-features.sh upgrade --metadata 3.4
|
||||
</code>
|
||||
</li>
|
||||
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
|
||||
|
@ -503,7 +503,7 @@
|
|||
</li>
|
||||
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
|
||||
<code>
|
||||
./bin/kafka-features.sh upgrade --metadata 3.3
|
||||
bin/kafka-features.sh upgrade --metadata 3.3
|
||||
</code>
|
||||
</li>
|
||||
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
|
||||
|
@ -596,7 +596,7 @@
|
|||
<a href="https://www.slf4j.org/codes.html#no_tlm">possible compatibility issues originating from the logging framework</a>.</li>
|
||||
<li>The example connectors, <code>FileStreamSourceConnector</code> and <code>FileStreamSinkConnector</code>, have been
|
||||
removed from the default classpath. To use them in Kafka Connect standalone or distributed mode they need to be
|
||||
explicitly added, for example <code>CLASSPATH=./libs/connect-file-3.2.0.jar ./bin/connect-distributed.sh</code>.</li>
|
||||
explicitly added, for example <code>CLASSPATH=./libs/connect-file-3.2.0.jar bin/connect-distributed.sh</code>.</li>
|
||||
</ul>
|
||||
|
||||
<h4><a id="upgrade_3_1_0" href="#upgrade_3_1_0">Upgrading to 3.1.0 from any version 0.8.x through 3.0.x</a></h4>
|
||||
|
|
Loading…
Reference in New Issue