mirror of https://github.com/apache/kafka.git
KAFKA-4234; Revert automatic offset commit behavior in consumer's `unsubscribe()`
Temporarily disable the offset commit (when auto commit is enabled) in the new consumer's `unsubscribe()` method towards a workaround for the issue reported in [KAFKA-3491](https://issues.apache.org/jira/browse/KAFKA-3491). For now, a call to `unsubscribe()` can be made to reset the offsets in case processing the batch received from the most recent `poll()` is interrupted (due to some exception). Author: Vahid Hashemian <vahidhashemian@us.ibm.com> Reviewers: Jason Gustafson <jason@confluent.io> Closes #1944 from vahidhashemian/KAFKA-4234
This commit is contained in:
parent
095475d481
commit
20322446aa
|
|
@ -874,10 +874,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
||||||
public void unsubscribe() {
|
public void unsubscribe() {
|
||||||
acquire();
|
acquire();
|
||||||
try {
|
try {
|
||||||
// make sure the offsets of topic partitions the consumer is unsubscribing from
|
|
||||||
// are committed since there will be no following rebalance
|
|
||||||
this.coordinator.maybeAutoCommitOffsetsNow();
|
|
||||||
|
|
||||||
log.debug("Unsubscribed all topics or patterns and assigned partitions");
|
log.debug("Unsubscribed all topics or patterns and assigned partitions");
|
||||||
this.subscriptions.unsubscribe();
|
this.subscriptions.unsubscribe();
|
||||||
this.coordinator.maybeLeaveGroup();
|
this.coordinator.maybeLeaveGroup();
|
||||||
|
|
|
||||||
|
|
@ -614,8 +614,7 @@ public class KafkaConsumerTest {
|
||||||
* do not immediately change, and the latest consumed offsets of its to-be-revoked
|
* do not immediately change, and the latest consumed offsets of its to-be-revoked
|
||||||
* partitions are properly committed (when auto-commit is enabled).
|
* partitions are properly committed (when auto-commit is enabled).
|
||||||
* Upon unsubscribing from subscribed topics the consumer subscription and assignment
|
* Upon unsubscribing from subscribed topics the consumer subscription and assignment
|
||||||
* are both updated right away and its consumed offsets are committed (if auto-commit
|
* are both updated right away but its consumed offsets are not auto committed.
|
||||||
* is enabled).
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSubscriptionChangesWithAutoCommitEnabled() {
|
public void testSubscriptionChangesWithAutoCommitEnabled() {
|
||||||
|
|
@ -722,21 +721,12 @@ public class KafkaConsumerTest {
|
||||||
assertTrue(consumer.assignment().size() == 2);
|
assertTrue(consumer.assignment().size() == 2);
|
||||||
assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
|
assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
|
||||||
|
|
||||||
// mock the offset commit response for to be revoked partitions
|
|
||||||
Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>();
|
|
||||||
partitionOffsets2.put(tp0, 2L);
|
|
||||||
partitionOffsets2.put(t3p0, 100L);
|
|
||||||
commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2);
|
|
||||||
|
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
// verify that subscription and assignment are both cleared
|
// verify that subscription and assignment are both cleared
|
||||||
assertTrue(consumer.subscription().isEmpty());
|
assertTrue(consumer.subscription().isEmpty());
|
||||||
assertTrue(consumer.assignment().isEmpty());
|
assertTrue(consumer.assignment().isEmpty());
|
||||||
|
|
||||||
// verify that the offset commits occurred as expected
|
|
||||||
assertTrue(commitReceived.get());
|
|
||||||
|
|
||||||
consumer.close();
|
consumer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue