mirror of https://github.com/apache/kafka.git
KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause invalid position
This commit is contained in:
parent
2040890462
commit
cabb017731
|
@ -219,7 +219,7 @@ public class Fetcher<K, V> {
|
|||
for (PartitionRecords<K, V> part : this.records) {
|
||||
Long consumed = subscriptions.consumed(part.partition);
|
||||
if (this.subscriptions.assignedPartitions().contains(part.partition)
|
||||
&& (consumed == null || part.fetchOffset == consumed)) {
|
||||
&& consumed != null && part.fetchOffset == consumed) {
|
||||
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
|
||||
if (records == null) {
|
||||
records = part.records;
|
||||
|
@ -364,6 +364,20 @@ public class Fetcher<K, V> {
|
|||
parsed.add(parseRecord(tp, logEntry));
|
||||
bytes += logEntry.size();
|
||||
}
|
||||
|
||||
// we are interested in this fetch only if the beginning offset matches the
|
||||
// current consumed position
|
||||
Long consumed = subscriptions.consumed(tp);
|
||||
if (consumed == null) {
|
||||
continue;
|
||||
} else if (consumed != fetchOffset) {
|
||||
// the fetched position has gotten out of sync with the consumed position
|
||||
// (which might happen when a rebalance occurs with a fetch in-flight),
|
||||
// so we need to reset the fetch position so the next fetch is right
|
||||
subscriptions.fetched(tp, consumed);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsed.size() > 0) {
|
||||
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
||||
this.subscriptions.fetched(tp, record.offset() + 1);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -114,9 +115,26 @@ public class FetcherTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchDuringRebalance() {
|
||||
subscriptions.subscribe(topicName);
|
||||
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||
subscriptions.fetched(tp, 0);
|
||||
subscriptions.consumed(tp, 0);
|
||||
|
||||
fetcher.initFetches(cluster);
|
||||
|
||||
// Now the rebalance happens and fetch positions are cleared
|
||||
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
|
||||
consumerClient.poll(0);
|
||||
|
||||
// The active fetch should be ignored since its position is no longer valid
|
||||
assertTrue(fetcher.fetchedRecords().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchFailed() {
|
||||
List<ConsumerRecord<byte[], byte[]>> records;
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.fetched(tp, 0);
|
||||
subscriptions.consumed(tp, 0);
|
||||
|
@ -148,7 +166,6 @@ public class FetcherTest {
|
|||
|
||||
@Test
|
||||
public void testFetchOutOfRange() {
|
||||
List<ConsumerRecord<byte[], byte[]>> records;
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.fetched(tp, 5);
|
||||
subscriptions.consumed(tp, 5);
|
||||
|
|
|
@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
|
||||
}
|
||||
|
||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
|
||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
|
||||
|
||||
/*
|
||||
* 1. Produce a bunch of messages
|
||||
|
|
Loading…
Reference in New Issue