KAFKA-2478: Fix manual committing example in javadoc

Committing before inserting all records into the database
might lead to some records being lost.

I've changed the example to commit only after all records
returned by `poll` are inserted into the database.

Author: Dmitry Stratiychuk <dstratiychuk@yammer-inc.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #210 from shtratos/KAFKA-2478
This commit is contained in:
Dmitry Stratiychuk 2016-01-26 11:04:58 -08:00 committed by Guozhang Wang
parent 5ae97196ae
commit 82c2191490
1 changed files with 9 additions and 9 deletions

View File

@ -145,7 +145,7 @@ import java.util.regex.Pattern;
* props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
* props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
* while (true) {
* ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
@ -200,19 +200,19 @@ import java.util.regex.Pattern;
* props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
* props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
* KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
* int commitInterval = 200;
* List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String, String&gt;&gt;();
* final int minBatchSize = 200;
* List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;&gt;();
* while (true) {
* ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
* for (ConsumerRecord&lt;String, String&gt; record : records) {
* buffer.add(record);
* if (buffer.size() &gt;= commitInterval) {
* insertIntoDb(buffer);
* consumer.commitSync();
* buffer.clear();
* }
* }
* if (buffer.size() &gt;= minBatchSize) {
* insertIntoDb(buffer);
* consumer.commitSync();
* buffer.clear();
* }
* }
* </pre>