mirror of https://github.com/apache/kafka.git
KAFKA-4059; API Design section under Implementation is out of date
It describes the old deprecated clients and it's better to just remove it. The contribution is my original work and I license the work to the project under the project's open source license. Author: Tom Bentley <tbentley@redhat.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #3385 from tombentley/KAFKA-4059
This commit is contained in:
parent
a4794b11b2
commit
a6799f4e14
|
@ -16,134 +16,11 @@
|
|||
-->
|
||||
|
||||
<script id="implementation-template" type="text/x-handlebars-template">
|
||||
<h3><a id="apidesign" href="#apidesign">5.1 API Design</a></h3>
|
||||
|
||||
<h4><a id="impl_producer" href="#impl_producer">Producer APIs</a></h4>
|
||||
|
||||
<p>
|
||||
The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>.
|
||||
<pre class="brush: java;">
|
||||
class Producer<T> {
|
||||
|
||||
/* Sends the data, partitioned by key to the topic using either the */
|
||||
/* synchronous or the asynchronous producer */
|
||||
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
|
||||
|
||||
/* Sends a list of data, partitioned by key to the topic using either */
|
||||
/* the synchronous or the asynchronous producer */
|
||||
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
|
||||
|
||||
/* Closes the producer and cleans up */
|
||||
public void close();
|
||||
|
||||
}
|
||||
</pre>
|
||||
|
||||
The goal is to expose all the producer functionality through a single API to the client.
|
||||
|
||||
The Kafka producer
|
||||
<ul>
|
||||
<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data:
|
||||
<p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</code> interface and setting <code>callback.handler</code> config parameter to that class.
|
||||
</p>
|
||||
</li>
|
||||
<li>handles the serialization of data through a user-specified <code>Encoder</code>:
|
||||
<pre class="brush: java;">
|
||||
interface Encoder<T> {
|
||||
public Message toMessage(T data);
|
||||
}
|
||||
</pre>
|
||||
<p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
|
||||
</li>
|
||||
<li>provides software load balancing through an optionally user-specified <code>Partitioner</code>:
|
||||
<p>
|
||||
The routing decision is influenced by the <code>kafka.producer.Partitioner</code>.
|
||||
<pre class="brush: java;">
|
||||
interface Partitioner<T> {
|
||||
int partition(T key, int numPartitions);
|
||||
}
|
||||
</pre>
|
||||
The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.
|
||||
</p>
|
||||
</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<h4><a id="impl_consumer" href="#impl_consumer">Consumer APIs</a></h4>
|
||||
<p>
|
||||
We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
|
||||
</p>
|
||||
<p>
|
||||
The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
|
||||
</p>
|
||||
|
||||
<h5><a id="impl_lowlevel" href="#impl_lowlevel">Low-level API</a></h5>
|
||||
<pre class="brush: java;">
|
||||
class SimpleConsumer {
|
||||
|
||||
/* Send fetch request to a broker and get back a set of messages. */
|
||||
public ByteBufferMessageSet fetch(FetchRequest request);
|
||||
|
||||
/* Send a list of fetch requests to a broker and get back a response set. */
|
||||
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
|
||||
|
||||
/**
|
||||
* Get a list of valid offsets (up to maxSize) before the given time.
|
||||
* The result is a list of offsets, in descending order.
|
||||
* @param time: time in millisecs,
|
||||
* if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
|
||||
* if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
|
||||
*/
|
||||
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
|
||||
}
|
||||
</pre>
|
||||
|
||||
The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.
|
||||
|
||||
<h5><a id="impl_highlevel" href="#impl_highlevel">High-level API</a></h5>
|
||||
<pre class="brush: java;">
|
||||
|
||||
/* create a connection to the cluster */
|
||||
ConsumerConnector connector = Consumer.create(consumerConfig);
|
||||
|
||||
interface ConsumerConnector {
|
||||
|
||||
/**
|
||||
* This method is used to get a list of KafkaStreams, which are iterators over
|
||||
* MessageAndMetadata objects from which you can obtain messages and their
|
||||
* associated metadata (currently only topic).
|
||||
* Input: a map of <topic, #streams>
|
||||
* Output: a map of <topic, list of message streams>
|
||||
*/
|
||||
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
|
||||
|
||||
/**
|
||||
* You can also obtain a list of KafkaStreams, that iterate over messages
|
||||
* from topics that match a TopicFilter. (A TopicFilter encapsulates a
|
||||
* whitelist or a blacklist which is a standard Java regex.)
|
||||
*/
|
||||
public List<KafkaStream> createMessageStreamsByFilter(
|
||||
TopicFilter topicFilter, int numStreams);
|
||||
|
||||
/* Commit the offsets of all messages consumed so far. */
|
||||
public commitOffsets()
|
||||
|
||||
/* Shut down the connector */
|
||||
public shutdown()
|
||||
}
|
||||
</pre>
|
||||
<p>
|
||||
This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
|
||||
</p>
|
||||
<p>
|
||||
The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).
|
||||
</p>
|
||||
|
||||
<h3><a id="networklayer" href="#networklayer">5.2 Network Layer</a></h3>
|
||||
<h3><a id="networklayer" href="#networklayer">5.1 Network Layer</a></h3>
|
||||
<p>
|
||||
The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the <code>MessageSet</code> interface a <code>writeTo</code> method. This allows the file-backed message set to use the more efficient <code>transferTo</code> implementation instead of an in-process buffered write. The threading model is a single acceptor thread and <i>N</i> processor threads which handle a fixed number of connections each. This design has been pretty thoroughly tested <a href="http://sna-projects.com/blog/2009/08/introducing-the-nio-socketserver-implementation">elsewhere</a> and found to be simple to implement and fast. The protocol is kept quite simple to allow for future implementation of clients in other languages.
|
||||
</p>
|
||||
<h3><a id="messages" href="#messages">5.3 Messages</a></h3>
|
||||
<h3><a id="messages" href="#messages">5.2 Messages</a></h3>
|
||||
<p>
|
||||
Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The header contains the following fields:
|
||||
<ul>
|
||||
|
@ -154,7 +31,7 @@
|
|||
</ul>
|
||||
Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.
|
||||
|
||||
<h3><a id="messageformat" href="#messageformat">5.4 Message Format</a></h3>
|
||||
<h3><a id="messageformat" href="#messageformat">5.3 Message Format</a></h3>
|
||||
|
||||
<pre class="brush: java;">
|
||||
/**
|
||||
|
@ -178,7 +55,7 @@
|
|||
*/
|
||||
</pre>
|
||||
</p>
|
||||
<h3><a id="log" href="#log">5.5 Log</a></h3>
|
||||
<h3><a id="log" href="#log">5.4 Log</a></h3>
|
||||
<p>
|
||||
A log for a topic named "my_topic" with two partitions consists of two directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message length which is followed by the <i>N</i> message bytes. Each message is uniquely identified by a 64-bit integer <i>offset</i> giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly <i>S</i> bytes from the previous file where <i>S</i> is the max log file size given in the configuration.
|
||||
</p>
|
||||
|
@ -251,7 +128,7 @@
|
|||
Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost).
|
||||
</p>
|
||||
|
||||
<h3><a id="distributionimpl" href="#distributionimpl">5.6 Distribution</a></h3>
|
||||
<h3><a id="distributionimpl" href="#distributionimpl">5.5 Distribution</a></h3>
|
||||
<h4><a id="impl_offsettracking" href="#impl_offsettracking">Consumer Offset Tracking</a></h4>
|
||||
<p>
|
||||
The high-level consumer tracks the maximum offset it has consumed in each partition and periodically commits its offset vector so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the <i>offset manager</i>. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that offset manager (broker). The high-level consumer handles this automatically. If you use the simple consumer you will need to manage offsets manually. This is currently unsupported in the Java simple consumer which can only commit or fetch offsets in ZooKeeper. If you use the Scala simple consumer you can discover the offset manager and explicitly commit or fetch offsets to the offset manager. A consumer can look up its offset manager by issuing a GroupCoordinatorRequest to any Kafka broker and reading the GroupCoordinatorResponse which will contain the offset manager. The consumer can then proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover the offset manager. If you wish to manage your offsets manually, you can take a look at these <a href="https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka">code samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest</a>.
|
||||
|
|
|
@ -68,12 +68,11 @@
|
|||
</li>
|
||||
<li><a href="#implementation">5. Implementation</a>
|
||||
<ul>
|
||||
<li><a href="#apidesign">5.1 API Design</a>
|
||||
<li><a href="#networklayer">5.2 Network Layer</a>
|
||||
<li><a href="#messages">5.3 Messages</a>
|
||||
<li><a href="#messageformat">5.4 Message format</a>
|
||||
<li><a href="#log">5.5 Log</a>
|
||||
<li><a href="#distributionimpl">5.6 Distribution</a>
|
||||
<li><a href="#networklayer">5.1 Network Layer</a>
|
||||
<li><a href="#messages">5.2 Messages</a>
|
||||
<li><a href="#messageformat">5.3 Message format</a>
|
||||
<li><a href="#log">5.4 Log</a>
|
||||
<li><a href="#distributionimpl">5.5 Distribution</a>
|
||||
</ul>
|
||||
</li>
|
||||
<li><a href="#operations">6. Operations</a>
|
||||
|
|
Loading…
Reference in New Issue