MINOR: Improve Kafka documentation

Improve the documentation by fixing typos, punctuations, and correcting the content.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Grant Henke <granthenke@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #778 from vahidhashemian/typo05/fix_documentation_typos
This commit is contained in:
Vahid Hashemian 2016-01-23 22:26:53 -08:00 committed by Ewen Cheslack-Postava
parent 4f39b5bc5b
commit d00cf520fb
4 changed files with 50 additions and 48 deletions

View File

@ -46,7 +46,7 @@ The new producer -
<p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</code> interface and setting <code>callback.handler</code> config parameter to that class. <p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</code> interface and setting <code>callback.handler</code> config parameter to that class.
</p> </p>
</li> </li>
<li>handles the serialization of data through a user-specified <code>Encoder</code> - <li>handles the serialization of data through a user-specified <code>Encoder</code>:
<pre> <pre>
interface Encoder&lt;T&gt; { interface Encoder&lt;T&gt; {
public Message toMessage(T data); public Message toMessage(T data);
@ -54,7 +54,7 @@ interface Encoder&lt;T&gt; {
</pre> </pre>
<p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p> <p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
</li> </li>
<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - <li>provides software load balancing through an optionally user-specified <code>Partitioner</code>:
<p> <p>
The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. The routing decision is influenced by the <code>kafka.producer.Partitioner</code>.
<pre> <pre>
@ -198,13 +198,13 @@ The log allows serial appends which always go to the last file. This file is rol
</p> </p>
<h4><a id="impl_reads" href="#impl_reads">Reads</a></h4> <h4><a id="impl_reads" href="#impl_reads">Reads</a></h4>
<p> <p>
Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte buffer. <i>S</i> is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it need ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting. Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte buffer. <i>S</i> is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it needs to ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting.
</p> </p>
<p> <p>
The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file. The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file.
</p> </p>
<p> <p>
The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existant offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case. The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existent offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case.
</p> </p>
<p> The following is the format of the results sent to the consumer. <p> The following is the format of the results sent to the consumer.
@ -237,7 +237,7 @@ Data is deleted one log segment at a time. The log manager allows pluggable dele
The log provides a configuration parameter <i>M</i> which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset. The log provides a configuration parameter <i>M</i> which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset.
</p> </p>
<p> <p>
Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is not written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost). Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost).
</p> </p>
<h3><a id="distributionimpl" href="#distributionimpl">5.6 Distribution</a></h3> <h3><a id="distributionimpl" href="#distributionimpl">5.6 Distribution</a></h3>
@ -285,7 +285,7 @@ When an element in a path is denoted [xyz], that means that the value of xyz is
/brokers/ids/[0...N] --> host:port (ephemeral node) /brokers/ids/[0...N] --> host:port (ephemeral node)
</pre> </pre>
<p> <p>
This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error. This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) results in an error.
</p> </p>
<p> <p>
Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
@ -324,7 +324,7 @@ Each of the consumers in the group registers under its group and creates a znode
<h4><a id="impl_zkconsumeroffsets" href="#impl_zkconsumeroffsets">Consumer Offsets</a></h4> <h4><a id="impl_zkconsumeroffsets" href="#impl_zkconsumeroffsets">Consumer Offsets</a></h4>
<p> <p>
Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>. This valued is stored in a ZooKeeper directory. Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>.
</p> </p>
<pre> <pre>
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node) /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

View File

@ -24,7 +24,7 @@ First let's review some basic messaging terminology:
<ul> <ul>
<li>Kafka maintains feeds of messages in categories called <i>topics</i>. <li>Kafka maintains feeds of messages in categories called <i>topics</i>.
<li>We'll call processes that publish messages to a Kafka topic <i>producers</i>. <li>We'll call processes that publish messages to a Kafka topic <i>producers</i>.
<li>We'll call processes that subscribe to topics and process the feed of published messages <i>consumers</i>.. <li>We'll call processes that subscribe to topics and process the feed of published messages <i>consumers</i>.
<li>Kafka is run as a cluster comprised of one or more servers each of which is called a <i>broker</i>. <li>Kafka is run as a cluster comprised of one or more servers each of which is called a <i>broker</i>.
</ul> </ul>

View File

@ -66,7 +66,7 @@ Kafka does not currently support reducing the number of partitions for a topic o
<h4><a id="basic_ops_restarting" href="#basic_ops_restarting">Graceful shutdown</a></h4> <h4><a id="basic_ops_restarting" href="#basic_ops_restarting">Graceful shutdown</a></h4>
The Kafka cluster will automatically detect any broker shutdown or failure and elect new leaders for the partitions on that machine. This will occur whether a server fails or it is brought down intentionally for maintenance or configuration changes. For the later cases Kafka supports a more graceful mechanism for stoping a server then just killing it. The Kafka cluster will automatically detect any broker shutdown or failure and elect new leaders for the partitions on that machine. This will occur whether a server fails or it is brought down intentionally for maintenance or configuration changes. For the latter cases Kafka supports a more graceful mechanism for stoping a server than just killing it.
When a server is stopped gracefully it has two optimizations it will take advantage of: When a server is stopped gracefully it has two optimizations it will take advantage of:
<ol> <ol>
@ -123,7 +123,7 @@ Combining mirroring with the configuration <code>auto.create.topics.enable=true<
<h4><a id="basic_ops_consumer_lag" href="#basic_ops_consumer_lag">Checking consumer position</a></h4> <h4><a id="basic_ops_consumer_lag" href="#basic_ops_consumer_lag">Checking consumer position</a></h4>
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named <i>my-group</i> consuming a topic named <i>my-topic</i> would look like this: Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named <i>my-group</i> consuming a topic named <i>my-topic</i> would look like this:
<pre> <pre>
&gt; bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test &gt; bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0 my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0 my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
@ -146,9 +146,9 @@ The partition reassignment tool can run in 3 mutually exclusive modes -
<h5><a id="basic_ops_automigrate" href="#basic_ops_automigrate">Automatically migrating data to new machines</a></h5> <h5><a id="basic_ops_automigrate" href="#basic_ops_automigrate">Automatically migrating data to new machines</a></h5>
The partition reassignment tool can be used to move some topics off of the current set of brokers to the newly added brokers. This is typically useful while expanding an existing cluster since it is easier to move entire topics to the new set of brokers, than moving one partition at a time. When used to do this, the user should provide a list of topics that should be moved to the new set of brokers and a target list of new brokers. The tool then evenly distributes all partitions for the given list of topics across the new set of brokers. During this move, the replication factor of the topic is kept constant. Effectively the replicas for all partitions for the input list of topics are moved from the old set of brokers to the newly added brokers. The partition reassignment tool can be used to move some topics off of the current set of brokers to the newly added brokers. This is typically useful while expanding an existing cluster since it is easier to move entire topics to the new set of brokers, than moving one partition at a time. When used to do this, the user should provide a list of topics that should be moved to the new set of brokers and a target list of new brokers. The tool then evenly distributes all partitions for the given list of topics across the new set of brokers. During this move, the replication factor of the topic is kept constant. Effectively the replicas for all partitions for the input list of topics are moved from the old set of brokers to the newly added brokers.
<p> <p>
For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will <i>only</i> exist on brokers 5,6 For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will <i>only</i> exist on brokers 5,6.
<p> <p>
Since, the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows- Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows:
<pre> <pre>
> cat topics-to-move.json > cat topics-to-move.json
{"topics": [{"topic": "foo1"}, {"topics": [{"topic": "foo1"},
@ -156,7 +156,7 @@ Since, the tool accepts the input list of topics as a json file, you first need
"version":1 "version":1
} }
</pre> </pre>
Once the json file is ready, use the partition reassignment tool to generate a candidate assignment- Once the json file is ready, use the partition reassignment tool to generate a candidate assignment:
<pre> <pre>
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment Current partition replica assignment
@ -182,7 +182,7 @@ Proposed partition reassignment configuration
} }
</pre> </pre>
<p> <p>
The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows- The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows:
<pre> <pre>
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment Current partition replica assignment
@ -208,7 +208,7 @@ Successfully started reassignment of partitions
} }
</pre> </pre>
<p> <p>
Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option:
<pre> <pre>
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment: Status of partition reassignment:
@ -223,14 +223,14 @@ Reassignment of partition [foo2,2] completed successfully
<h5><a id="basic_ops_partitionassignment" href="#basic_ops_partitionassignment">Custom partition assignment and migration</a></h5> <h5><a id="basic_ops_partitionassignment" href="#basic_ops_partitionassignment">Custom partition assignment and migration</a></h5>
The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. When used in this manner, it is assumed that the user knows the reassignment plan and does not require the tool to generate a candidate reassignment, effectively skipping the --generate step and moving straight to the --execute step The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. When used in this manner, it is assumed that the user knows the reassignment plan and does not require the tool to generate a candidate reassignment, effectively skipping the --generate step and moving straight to the --execute step
<p> <p>
For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3 For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3:
<p> <p>
The first step is to hand craft the custom reassignment plan in a json file- The first step is to hand craft the custom reassignment plan in a json file:
<pre> <pre>
> cat custom-reassignment.json > cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]} {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
</pre> </pre>
Then, use the json file with the --execute option to start the reassignment process- Then, use the json file with the --execute option to start the reassignment process:
<pre> <pre>
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment Current partition replica assignment
@ -248,7 +248,7 @@ Successfully started reassignment of partitions
} }
</pre> </pre>
<p> <p>
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option:
<pre> <pre>
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment: Status of partition reassignment:
@ -264,13 +264,13 @@ Increasing the replication factor of an existing partition is easy. Just specify
<p> <p>
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7. For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
<p> <p>
The first step is to hand craft the custom reassignment plan in a json file- The first step is to hand craft the custom reassignment plan in a json file:
<pre> <pre>
> cat increase-replication-factor.json > cat increase-replication-factor.json
{"version":1, {"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]} "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
</pre> </pre>
Then, use the json file with the --execute option to start the reassignment process- Then, use the json file with the --execute option to start the reassignment process:
<pre> <pre>
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment Current partition replica assignment
@ -284,13 +284,13 @@ Successfully started reassignment of partitions
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]} "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
</pre> </pre>
<p> <p>
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option:
<pre> <pre>
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment: Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully Reassignment of partition [foo,0] completed successfully
</pre> </pre>
You can also verify the increase in replication factor with the kafka-topics tool- You can also verify the increase in replication factor with the kafka-topics tool:
<pre> <pre>
> bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe > bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
@ -403,10 +403,12 @@ LinkedIn's tuning looks like this:
</pre> </pre>
For reference, here are the stats on one of LinkedIn's busiest clusters (at peak): For reference, here are the stats on one of LinkedIn's busiest clusters (at peak):
- 60 brokers <ul>
- 50k partitions (replication factor 2) <li>60 brokers</li>
- 800k messages/sec in <li>50k partitions (replication factor 2)</li>
- 300 MB/sec inbound, 1 GB/sec+ outbound <li>800k messages/sec in</li>
<li>300 MB/sec inbound, 1 GB/sec+ outbound</li>
</ul>
The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they're doing less than 1 young GC per second. The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they're doing less than 1 young GC per second.
@ -415,7 +417,7 @@ We are using dual quad-core Intel Xeon machines with 24GB of memory.
<p> <p>
You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30. You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.
<p> <p>
The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better). The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better).
<h4><a id="os" href="#os">OS</a></h4> <h4><a id="os" href="#os">OS</a></h4>
Kafka should run well on any unix system and has been tested on Linux and Solaris. Kafka should run well on any unix system and has been tested on Linux and Solaris.
@ -440,7 +442,7 @@ RAID can potentially do better at balancing load between disks (although it does
Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement. Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.
<h4><a id="appvsosflush" href="#appvsosflush">Application vs. OS Flush Management</a></h4> <h4><a id="appvsosflush" href="#appvsosflush">Application vs. OS Flush Management</a></h4>
Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the and flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written. There are several choices in this configuration. Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written. There are several choices in this configuration.
<p> <p>
Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup. Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup.
<p> <p>
@ -448,7 +450,7 @@ Note that durability in Kafka does not require syncing data to disk, as a failed
<p> <p>
We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka's own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported. We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka's own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.
<p> <p>
The drawback of using application level flush settings are that this is less efficient in it's disk usage pattern (it gives the OS less leeway to re-order writes) and it can introduce latency as fsync in most Linux filesystems blocks writes to the file whereas the background flushing does much more granular page-level locking. The drawback of using application level flush settings is that it is less efficient in it's disk usage pattern (it gives the OS less leeway to re-order writes) and it can introduce latency as fsync in most Linux filesystems blocks writes to the file whereas the background flushing does much more granular page-level locking.
<p> <p>
In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful. In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful.
@ -487,9 +489,9 @@ It is not necessary to tune these settings, however those wanting to optimize pe
Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system. Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
<p> <p>
The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX. The easiest way to see the available metrics is to fire up jconsole and point it at a running kafka client or server; this will allow browsing all metrics with JMX.
<p> <p>
We pay particular we do graphing and alerting on the following metrics: We do graphing and alerting on the following metrics:
<table class="data-table"> <table class="data-table">
<tbody><tr> <tbody><tr>
<th>Description</th> <th>Description</th>
@ -644,7 +646,7 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>waiting-threads</td> <td>waiting-threads</td>
<td>The number of user threads blocked waiting for buffer memory to enqueue their records</td> <td>The number of user threads blocked waiting for buffer memory to enqueue their records.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -684,17 +686,17 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>record-queue-time-max</td> <td>record-queue-time-max</td>
<td>The maximum time in ms record batches spent in the record accumulator</td> <td>The maximum time in ms record batches spent in the record accumulator.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
<td>request-latency-avg</td> <td>request-latency-avg</td>
<td>The average request latency in ms</td> <td>The average request latency in ms.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
<td>request-latency-max</td> <td>request-latency-max</td>
<td>The maximum request latency in ms</td> <td>The maximum request latency in ms.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -709,22 +711,22 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>record-retry-rate</td> <td>record-retry-rate</td>
<td>The average per-second number of retried record sends</td> <td>The average per-second number of retried record sends.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
<td>record-error-rate</td> <td>record-error-rate</td>
<td>The average per-second number of record sends that resulted in errors</td> <td>The average per-second number of record sends that resulted in errors.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
<td>record-size-max</td> <td>record-size-max</td>
<td>The maximum record size</td> <td>The maximum record size.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
<td>record-size-avg</td> <td>record-size-avg</td>
<td>The average record size</td> <td>The average record size.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -774,7 +776,7 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>incoming-byte-rate</td> <td>incoming-byte-rate</td>
<td>Bytes/second read off all sockets</td> <td>Bytes/second read off all sockets.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -784,7 +786,7 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>select-rate</td> <td>select-rate</td>
<td>Number of times the I/O layer checked for new I/O to perform per second</td> <td>Number of times the I/O layer checked for new I/O to perform per second.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -804,7 +806,7 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>io-ratio</td> <td>io-ratio</td>
<td>The fraction of time the I/O thread spent doing I/O</td> <td>The fraction of time the I/O thread spent doing I/O.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -869,7 +871,7 @@ The following metrics are available on new producer instances.
</tr> </tr>
<tr> <tr>
<td>record-retry-rate</td> <td>record-retry-rate</td>
<td>The average per-second number of retried record sends for a topic</td> <td>The average per-second number of retried record sends for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr> </tr>
<tr> <tr>
@ -889,9 +891,9 @@ The following metrics are available on new producer instances.
</tr> </tr>
</tbody></table> </tbody></table>
We recommend monitor GC time and other stats and various server stats such as CPU utilization, I/O service time, etc. We recommend monitoring GC time and other stats and various server stats such as CPU utilization, I/O service time, etc.
On the client side, we recommend monitor the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0. On the client side, we recommend monitoring the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.
<h4><a id="basic_ops_audit" href="#basic_ops_audit">Audit</a></h4> <h4><a id="basic_ops_audit" href="#basic_ops_audit">Audit</a></h4>
The final alerting we do is on the correctness of the data delivery. We audit that every message that is sent is consumed by all consumers and measure the lag for this to occur. For important topics we alert if a certain completeness is not achieved in a certain time period. The details of this are discussed in KAFKA-260. The final alerting we do is on the correctness of the data delivery. We audit that every message that is sent is consumed by all consumers and measure the lag for this to occur. For important topics we alert if a certain completeness is not achieved in a certain time period. The details of this are discussed in KAFKA-260.

View File

@ -149,10 +149,10 @@ Apache Kafka allows clients to connect over SSL. By default SSL is disabled but
-----END CERTIFICATE----- -----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</pre> issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</pre>
If the certificate does not show up or if there are any other error messages than your keystore is not setup properly.</li> If the certificate does not show up or if there are any other error messages then your keystore is not setup properly.</li>
<li><h4><a id="security_configclients" href="#security_configclients">Configuring Kafka Clients</a></h4> <li><h4><a id="security_configclients" href="#security_configclients">Configuring Kafka Clients</a></h4>
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be same for both producer and consumer.<br> SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
If client authentication is not required in the broker, then the following is a minimal configuration example: If client authentication is not required in the broker, then the following is a minimal configuration example:
<pre> <pre>
security.protocol=SSL security.protocol=SSL