KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement mode (#19417)

Choose the acknowledgement mode based on the config
(`share.acknowledgement.mode`) and not on the basis of how the user
designs the application.
- The default value of the config is `IMPLICIT`, so if any
empty/null/invalid value is configured, then the mode defaults to
`IMPLICIT`.
- Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no
longer required.
- Added code to ensure if the application has any unacknowledged records
in a batch in "`explicit`" mode, then it will throw an
`IllegalStateException`. The expectation is if the mode is "explicit",
all the records received in that `poll()` would be acknowledged before
the next call to `poll()`.
- Modified the `ConsoleShareConsumer` to configure the mode to
"explicit" as it was using the explicit mode of acknowledging records.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-04-15 11:38:33 -04:00 committed by GitHub
parent 6c3995b954
commit f737ef31d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 377 additions and 205 deletions

View File

@ -135,6 +135,8 @@ public class ShareConsumerTest {
private List<TopicPartition> sgsTopicPartitions;
private static final String KEY = "content-type";
private static final String VALUE = "application/octet-stream";
private static final String EXPLICIT = "explicit";
private static final String IMPLICIT = "implicit";
public ShareConsumerTest(ClusterInstance cluster) {
this.cluster = cluster;
@ -594,7 +596,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -615,7 +617,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeCommitSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -638,7 +640,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgementCommitAsync() throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1");
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
@ -663,15 +665,16 @@ public class ShareConsumerTest {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer1.acknowledge(firstRecord);
shareConsumer1.acknowledge(secondRecord);
shareConsumer1.acknowledge(thirdRecord, AcknowledgeType.RELEASE);
shareConsumer1.commitAsync();
// The 3rd record should be reassigned to 2nd consumer when it polls, kept higher wait time
// as time out for locks is 15 secs.
// The 3rd record should be reassigned to 2nd consumer when it polls.
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records2 = shareConsumer2.poll(Duration.ofMillis(1000));
return records2.count() == 1 && records2.iterator().next().offset() == 2L;
@ -690,51 +693,16 @@ public class ShareConsumerTest {
}
}
@ClusterTest
public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
shareConsumer.subscribe(Set.of(tp.topic()));
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of()));
// The acknowledgement mode moves to PENDING from UNKNOWN.
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(0, records.count());
shareConsumer.commitAsync();
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.flush();
// The acknowledgement mode remains in PENDING because no records were returned.
records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
// The acknowledgement mode now moves to EXPLICIT.
shareConsumer.acknowledge(records.iterator().next());
shareConsumer.commitAsync();
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(500));
return partitionOffsetsMap1.containsKey(tp);
}, 30000, 100L, () -> "Didn't receive call to callback");
verifyShareGroupStateTopicRecordsProduced();
}
}
@ClusterTest
public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record4 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.send(record3);
@ -753,6 +721,7 @@ public class ShareConsumerTest {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
@ -760,21 +729,25 @@ public class ShareConsumerTest {
shareConsumer1.acknowledge(secondRecord);
shareConsumer1.commitAsync();
// The 3rd record should be re-presented to the consumer when it polls again.
producer.send(record4);
producer.flush();
// The next poll() should throw an IllegalStateException as there is still 1 unacknowledged record.
// In EXPLICIT acknowledgement mode, we are not allowed to have unacknowledged records from a batch.
assertThrows(IllegalStateException.class, () -> shareConsumer1.poll(Duration.ofMillis(5000)));
// Acknowledging the 3rd record
shareConsumer1.acknowledge(thirdRecord);
shareConsumer1.commitAsync();
// The next poll() will not throw an exception, it would continue to fetch more records.
records = shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
iterator = records.iterator();
firstRecord = iterator.next();
assertEquals(2L, firstRecord.offset());
ConsumerRecord<byte[], byte[]> fourthRecord = iterator.next();
assertEquals(3L, fourthRecord.offset());
// And poll again without acknowledging - the callback will receive the acknowledgement responses too
records = shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
iterator = records.iterator();
firstRecord = iterator.next();
assertEquals(2L, firstRecord.offset());
shareConsumer1.acknowledge(firstRecord);
shareConsumer1.acknowledge(fourthRecord);
// The callback will receive the acknowledgement responses after polling. The callback is
// called on entry to the poll method or during close. The commit is being performed asynchronously, so
@ -784,6 +757,7 @@ public class ShareConsumerTest {
shareConsumer1.close();
assertFalse(partitionExceptionMap.containsKey(tp));
assertTrue(partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).size() == 4);
verifyShareGroupStateTopicRecordsProduced();
}
}
@ -792,7 +766,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleasePollAccept() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -815,7 +789,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleaseAccept() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -835,7 +809,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleaseClose() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -853,7 +827,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeThrowsNotInBatch() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -970,7 +944,7 @@ public class ShareConsumerTest {
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -995,7 +969,7 @@ public class ShareConsumerTest {
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit"))) {
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, IMPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
@ -2069,7 +2043,7 @@ public class ShareConsumerTest {
cluster.bootstrapServers(),
topicName,
groupId,
Map.of()
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)
);
service.schedule(
@ -2145,7 +2119,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_uncommitted");
try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1");
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();
try {
@ -2231,7 +2206,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_committed");
try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1");
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();
@ -2282,7 +2258,11 @@ public class ShareConsumerTest {
// Wait for the aborted marker offset for Message 4 (7L) to be fetched and acknowledged by the consumer.
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(500));
ConsumerRecords<byte[], byte[]> pollRecords = shareConsumer.poll(Duration.ofMillis(500));
if (pollRecords.count() > 0) {
// We will release Message 3 again if it was received in this poll().
pollRecords.forEach(consumerRecord -> shareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE));
}
return partitionOffsetsMap2.containsKey(tp) && partitionOffsetsMap2.get(tp).contains(7L);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort transaction marker offset for Message 4");
@ -2299,7 +2279,7 @@ public class ShareConsumerTest {
produceCommittedTransaction(transactionalProducer, "Message 8");
// Since isolation level is READ_UNCOMMITTED, we can consume Message 3 (committed transaction that was released), Message 5, Message 6, Message 7 and Message 8.
List<String> finalMessages = new ArrayList<>();
Set<String> finalMessages = new HashSet<>();
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> pollRecords = shareConsumer.poll(Duration.ofMillis(5000));
if (pollRecords.count() > 0) {
@ -2311,11 +2291,8 @@ public class ShareConsumerTest {
return finalMessages.size() == 5;
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all records post altering share isolation level");
assertEquals("Message 3", finalMessages.get(0));
assertEquals("Message 5", finalMessages.get(1));
assertEquals("Message 6", finalMessages.get(2));
assertEquals("Message 7", finalMessages.get(3));
assertEquals("Message 8", finalMessages.get(4));
Set<String> expected = Set.of("Message 3", "Message 5", "Message 6", "Message 7", "Message 8");
assertEquals(expected, finalMessages);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
@ -2330,7 +2307,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_committed");
try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1");
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@ -381,13 +382,10 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
/**
* <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode
* for share consumers. It will be removed or converted to a proper configuration before release.
* An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
* <code>share.acknowledgement.mode</code>
*/
public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode";
private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." +
" If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." +
public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "share.acknowledgement.mode";
private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." +
" If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" +
" use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," +
" delivery is acknowledged implicitly on the next call to poll or commit." +
@ -401,7 +399,7 @@ public class ConsumerConfig extends AbstractConfig {
*/
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
GROUP_REMOTE_ASSIGNOR_CONFIG,
INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
);
/**
@ -411,7 +409,7 @@ public class ConsumerConfig extends AbstractConfig {
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
HEARTBEAT_INTERVAL_MS_CONFIG,
SESSION_TIMEOUT_MS_CONFIG,
INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
);
static {
@ -695,12 +693,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
null,
in(null, "implicit", "explicit"),
ShareAcknowledgementMode.IMPLICIT.name(),
new ShareAcknowledgementMode.Validator(),
Importance.MEDIUM,
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC);
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC);
}
@Override

View File

@ -116,33 +116,33 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
* <p>
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes.
* <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
* it is using <em>explicit acknowledgement</em>. In this case:
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the
* consumer {@code share.acknowledgement.mode} property.
* <p>
* If the application sets the property to "implicit" or does not set it at all, then the consumer is using
* <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by:
* <ul>
* <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka.
* If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
* in response to a future poll.</li>
* <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to
* Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement.
* If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
* in response to a future poll.</li>
* <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and
* releases any remaining acquired records.</li>
* </ul>
* If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
* it is using <em>implicit acknowledgement</em>. In this case:
* <ul>
* <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
* <li>The application calls {@link #poll(Duration)} without committing, which also implicitly acknowledges all of
* <li>Calling {@link #poll(Duration)} without committing, which also implicitly acknowledges all
* the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is
* thrown by a failure to commit the acknowledgements.</li>
* <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li>
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li>
* </ul>
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>.
* The application must acknowledge all records returned from {@link #poll(Duration)} using
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}.
* If the application calls {@link #poll(Duration)} without having acknowledged all records, an
* {@link IllegalStateException} is thrown. The remaining unacknowledged records can still be acknowledged.
* In this mode, the application acknowledges delivery by:
* <ul>
* <li>Calling {@link #poll(Duration)} after it has acknowledged all records, which commits the acknowledgements
* to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.</li>
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which commits any pending
* acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which attempts to commit any pending acknowledgements and releases
* any remaining acquired records.</li>
* </ul>
* <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose
* between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required.
* <p>
* The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition
* are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records
* in a batch are performed atomically. This makes error handling significantly more straightforward because there can be
@ -195,12 +195,14 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
*
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
* This example demonstrates using different acknowledgement types depending on the outcome of processing the records.
* Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record.
* <pre>
* Properties props = new Properties();
* props.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;share.acknowledgement.mode&quot;, &quot;explicit&quot;);
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
@ -227,42 +229,6 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* It is only once {@link #commitSync()} is called that the acknowledgements are committed by sending the new state
* information to Kafka.
*
* <h4>Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)</h4>
* This example demonstrates ending processing of a batch of records on the first error.
* <pre>
* Properties props = new Properties();
* props.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
* ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(100));
* for (ConsumerRecord&lt;String, String&gt; record : records) {
* try {
* doProcessing(record);
* consumer.acknowledge(record, AcknowledgeType.ACCEPT);
* } catch (Exception e) {
* consumer.acknowledge(record, AcknowledgeType.REJECT);
* break;
* }
* }
* consumer.commitSync();
* }
* </pre>
* There are the following cases in this example:
* <ol>
* <li>The batch contains no records, in which case the application just polls again. The call to {@link #commitSync()}
* just does nothing because the batch was empty.</li>
* <li>All of the records in the batch are processed successfully. The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}
* specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch as successfully processed.</li>
* <li>One of the records encounters an exception. The call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} specifying
* {@code AcknowledgeType.REJECT} rejects that record. Earlier records in the batch have already been marked as successfully
* processed. The call to {@link #commitSync()} commits the acknowledgements, but the records after the failed record
* remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.</li>
* </ol>
*
* <h3>Reading Transactional Records</h3>
* The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}</code>
* configuration property. In a share group, the isolation level applies to the entire share group, not just individual

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
public class ShareAcknowledgementMode {
public enum AcknowledgementMode {
IMPLICIT, EXPLICIT;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
private final AcknowledgementMode acknowledgementMode;
public static final ShareAcknowledgementMode IMPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT);
public static final ShareAcknowledgementMode EXPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT);
private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) {
this.acknowledgementMode = acknowledgementMode;
}
/**
* Returns the ShareAcknowledgementMode from the given string.
*/
public static ShareAcknowledgementMode fromString(String acknowledgementMode) {
if (acknowledgementMode == null) {
throw new IllegalArgumentException("Acknowledgement mode is null");
}
if (Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode)) {
AcknowledgementMode mode = AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT));
switch (mode) {
case IMPLICIT:
return IMPLICIT;
case EXPLICIT:
return EXPLICIT;
default:
throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode);
}
} else {
throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode);
}
}
/**
* Returns the name of the acknowledgement mode.
*/
public String name() {
return acknowledgementMode.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareAcknowledgementMode that = (ShareAcknowledgementMode) o;
return acknowledgementMode == that.acknowledgementMode;
}
@Override
public int hashCode() {
return Objects.hash(acknowledgementMode);
}
@Override
public String toString() {
return "ShareAcknowledgementMode{" +
"mode=" + acknowledgementMode +
'}';
}
public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
String acknowledgementMode = (String) value;
try {
fromString(acknowledgementMode);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + acknowledgementMode + "` for configuration " +
name + ". The value must either be 'implicit' or 'explicit'.");
}
}
@Override
public String toString() {
String values = Arrays.stream(ShareAcknowledgementMode.AcknowledgementMode.values())
.map(ShareAcknowledgementMode.AcknowledgementMode::toString).collect(Collectors.joining(", "));
return "[" + values + "]";
}
}
}

View File

@ -176,20 +176,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
private ShareFetch<K, V> currentFetch;
private AcknowledgementCommitCallbackHandler acknowledgementCommitCallbackHandler;
private final List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements;
private enum AcknowledgementMode {
/** Acknowledgement mode is not yet known */
UNKNOWN,
/** Acknowledgement mode is pending, meaning that {@link #poll(Duration)} has been called once and
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} has not been called */
PENDING,
/** Acknowledgements are explicit, using {@link #acknowledge(ConsumerRecord, AcknowledgeType)} */
EXPLICIT,
/** Acknowledgements are implicit, not using {@link #acknowledge(ConsumerRecord, AcknowledgeType)} */
IMPLICIT
}
private AcknowledgementMode acknowledgementMode;
private final ShareAcknowledgementMode acknowledgementMode;
/**
* A thread-safe {@link ShareFetchBuffer fetch buffer} for the results that are populated in the
@ -457,7 +444,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
final ConsumerMetadata metadata,
final int requestTimeoutMs,
final int defaultApiTimeoutMs,
final String groupId) {
final String groupId,
final String acknowledgementModeConfig) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
@ -472,7 +460,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
this.metadata = metadata;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.acknowledgementMode = initializeAcknowledgementMode(null, log);
this.acknowledgementMode = ShareAcknowledgementMode.fromString(acknowledgementModeConfig);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.applicationEventHandler = applicationEventHandler;
@ -582,7 +570,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(true);
acknowledgeBatchIfImplicitAcknowledgement();
kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
@ -674,6 +662,10 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
}
if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
// We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application.
throw new IllegalStateException("All records must be acknowledged in explicit acknowledgement mode.");
}
return currentFetch;
}
}
@ -719,7 +711,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(false);
acknowledgeBatchIfImplicitAcknowledgement();
Timer requestTimer = time.timer(timeout.toMillis());
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
@ -763,7 +755,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(false);
acknowledgeBatchIfImplicitAcknowledgement();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (!acknowledgementsMap.isEmpty()) {
@ -1040,29 +1032,11 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
}
/**
* Called to progressively move the acknowledgement mode into IMPLICIT if it is not known to be EXPLICIT.
* If the acknowledgement mode is IMPLICIT, acknowledges all records in the current batch.
*
* @param calledOnPoll If true, called on poll. Otherwise, called on commit.
*/
private void acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) {
if (calledOnPoll) {
if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
// The first call to poll(Duration) moves into PENDING
acknowledgementMode = AcknowledgementMode.PENDING;
} else if (acknowledgementMode == AcknowledgementMode.PENDING && !currentFetch.isEmpty()) {
// If there are records to acknowledge and PENDING, moves into IMPLICIT
acknowledgementMode = AcknowledgementMode.IMPLICIT;
}
} else {
// If there are records to acknowledge and PENDING, moves into IMPLICIT
if (acknowledgementMode == AcknowledgementMode.PENDING && !currentFetch.isEmpty()) {
acknowledgementMode = AcknowledgementMode.IMPLICIT;
}
}
private void acknowledgeBatchIfImplicitAcknowledgement() {
// If IMPLICIT, acknowledge all records
if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT);
}
}
@ -1075,36 +1049,20 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
}
/**
* Called to move the acknowledgement mode into EXPLICIT, if it is not known to be IMPLICIT.
* Called to verify if the acknowledgement mode is EXPLICIT, else throws an exception.
*/
private void ensureExplicitAcknowledgement() {
if (acknowledgementMode == AcknowledgementMode.PENDING) {
// If poll(Duration) has been called once, moves into EXPLICIT
acknowledgementMode = AcknowledgementMode.EXPLICIT;
} else if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
throw new IllegalStateException("Implicit acknowledgement of delivery is being used.");
} else if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
throw new IllegalStateException("Acknowledge called before poll.");
}
}
/**
* Initializes the acknowledgement mode based on the configuration.
*/
private static AcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
if (config == null) {
return AcknowledgementMode.UNKNOWN;
}
String acknowledgementModeStr = config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
if ((acknowledgementModeStr == null) || acknowledgementModeStr.isEmpty()) {
return AcknowledgementMode.UNKNOWN;
} else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) {
return AcknowledgementMode.IMPLICIT;
} else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) {
return AcknowledgementMode.EXPLICIT;
}
log.warn("Invalid value for config {}: \"{}\"", ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, acknowledgementModeStr);
return AcknowledgementMode.UNKNOWN;
private static ShareAcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
String s = config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
return ShareAcknowledgementMode.fromString(s);
}
/**

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ShareAcknowledgementModeTest {
@Test
public void testFromString() {
assertEquals(ShareAcknowledgementMode.IMPLICIT, ShareAcknowledgementMode.fromString("implicit"));
assertEquals(ShareAcknowledgementMode.EXPLICIT, ShareAcknowledgementMode.fromString("explicit"));
assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("invalid"));
assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("IMPLICIT"));
assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString("EXPLICIT"));
assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString(""));
assertThrows(IllegalArgumentException.class, () -> ShareAcknowledgementMode.fromString(null));
}
@Test
public void testValidator() {
ShareAcknowledgementMode.Validator validator = new ShareAcknowledgementMode.Validator();
assertDoesNotThrow(() -> validator.ensureValid("test", "implicit"));
assertDoesNotThrow(() -> validator.ensureValid("test", "explicit"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "IMPLICIT"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "EXPLICIT"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", ""));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", null));
}
@Test
public void testEqualsAndHashCode() {
ShareAcknowledgementMode mode1 = ShareAcknowledgementMode.IMPLICIT;
ShareAcknowledgementMode mode2 = ShareAcknowledgementMode.IMPLICIT;
ShareAcknowledgementMode mode3 = ShareAcknowledgementMode.EXPLICIT;
assertEquals(mode1, mode2);
assertNotEquals(mode1, mode3);
assertNotEquals(mode2, mode3);
assertEquals(mode1.hashCode(), mode2.hashCode());
assertNotEquals(mode1.hashCode(), mode3.hashCode());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
@ -56,6 +57,7 @@ import org.mockito.Mockito;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -145,14 +147,16 @@ public class ShareConsumerImplTest {
mock(ShareFetchBuffer.class),
subscriptions,
"group-id",
"client-id");
"client-id",
"implicit");
}
private ShareConsumerImpl<String, String> newConsumer(
ShareFetchBuffer fetchBuffer,
SubscriptionState subscriptions,
String groupId,
String clientId
String clientId,
String acknowledgementMode
) {
final int defaultApiTimeoutMs = 1000;
final int requestTimeoutMs = 30000;
@ -173,7 +177,8 @@ public class ShareConsumerImplTest {
metadata,
requestTimeoutMs,
defaultApiTimeoutMs,
groupId
groupId,
acknowledgementMode
);
}
@ -345,6 +350,84 @@ public class ShareConsumerImplTest {
assertDoesNotThrow(() -> consumer.close());
}
@Test
public void testExplicitModeUnacknowledgedRecords() {
// Setup consumer with explicit acknowledgement mode
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(ShareFetchBuffer.class),
subscriptions,
"group-id",
"client-id",
"explicit");
// Setup test data
String topic = "test-topic";
int partition = 0;
TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), partition, topic);
ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, tip);
batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", "value1"));
batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", "value2"));
// Setup first fetch to return records
ShareFetch<String, String> firstFetch = ShareFetch.empty();
firstFetch.add(tip, batch);
doReturn(firstFetch)
.doReturn(ShareFetch.empty())
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
// Setup subscription
List<String> topics = Collections.singletonList(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, topics);
consumer.subscribe(topics);
// First poll should succeed and return records
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
assertEquals(2, records.count(), "Should have received 2 records");
// Second poll should fail because records weren't acknowledged
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> consumer.poll(Duration.ofMillis(100))
);
assertTrue(
exception.getMessage().contains("All records must be acknowledged in explicit acknowledgement mode."),
"Unexpected error message: " + exception.getMessage()
);
// Verify that acknowledging one record but not all still throws exception
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
consumer.acknowledge(iterator.next());
exception = assertThrows(
IllegalStateException.class,
() -> consumer.poll(Duration.ofMillis(100))
);
assertTrue(
exception.getMessage().contains("All records must be acknowledged in explicit acknowledgement mode."),
"Unexpected error message: " + exception.getMessage()
);
// Verify that after acknowledging all records, poll succeeds
consumer.acknowledge(iterator.next());
// Setup second fetch to return new records
ShareFetch<String, String> secondFetch = ShareFetch.empty();
ShareInFlightBatch<String, String> newBatch = new ShareInFlightBatch<>(2, tip);
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", "value3"));
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", "value4"));
secondFetch.add(tip, newBatch);
// Reset mock to return new records
doReturn(secondFetch)
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
// Verify that poll succeeds and returns new records
ConsumerRecords<String, String> newRecords = consumer.poll(Duration.ofMillis(100));
assertEquals(2, newRecords.count(), "Should have received 2 new records");
}
@Test
public void testCloseWithTopicAuthorizationException() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
@ -36,6 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@ -66,7 +68,11 @@ public class ConsoleShareConsumer {
messageCount = 0;
long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE;
ShareConsumer<byte[], byte[]> consumer = new KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
Properties consumerProps = opts.consumerProps();
// Set share acknowledgement mode to explicit.
consumerProps.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
ShareConsumer<byte[], byte[]> consumer = new KafkaShareConsumer<>(consumerProps, new ByteArrayDeserializer(), new ByteArrayDeserializer());
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(), consumer, timeoutMs);
addShutdownHook(consumerWrapper);