mirror of https://github.com/apache/kafka.git
KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause invalid position
This commit is contained in:
parent
2040890462
commit
cabb017731
|
@ -219,7 +219,7 @@ public class Fetcher<K, V> {
|
||||||
for (PartitionRecords<K, V> part : this.records) {
|
for (PartitionRecords<K, V> part : this.records) {
|
||||||
Long consumed = subscriptions.consumed(part.partition);
|
Long consumed = subscriptions.consumed(part.partition);
|
||||||
if (this.subscriptions.assignedPartitions().contains(part.partition)
|
if (this.subscriptions.assignedPartitions().contains(part.partition)
|
||||||
&& (consumed == null || part.fetchOffset == consumed)) {
|
&& consumed != null && part.fetchOffset == consumed) {
|
||||||
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
|
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
|
||||||
if (records == null) {
|
if (records == null) {
|
||||||
records = part.records;
|
records = part.records;
|
||||||
|
@ -364,6 +364,20 @@ public class Fetcher<K, V> {
|
||||||
parsed.add(parseRecord(tp, logEntry));
|
parsed.add(parseRecord(tp, logEntry));
|
||||||
bytes += logEntry.size();
|
bytes += logEntry.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we are interested in this fetch only if the beginning offset matches the
|
||||||
|
// current consumed position
|
||||||
|
Long consumed = subscriptions.consumed(tp);
|
||||||
|
if (consumed == null) {
|
||||||
|
continue;
|
||||||
|
} else if (consumed != fetchOffset) {
|
||||||
|
// the fetched position has gotten out of sync with the consumed position
|
||||||
|
// (which might happen when a rebalance occurs with a fetch in-flight),
|
||||||
|
// so we need to reset the fetch position so the next fetch is right
|
||||||
|
subscriptions.fetched(tp, consumed);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (parsed.size() > 0) {
|
if (parsed.size() > 0) {
|
||||||
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
||||||
this.subscriptions.fetched(tp, record.offset() + 1);
|
this.subscriptions.fetched(tp, record.offset() + 1);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -114,9 +115,26 @@ public class FetcherTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchDuringRebalance() {
|
||||||
|
subscriptions.subscribe(topicName);
|
||||||
|
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||||
|
subscriptions.fetched(tp, 0);
|
||||||
|
subscriptions.consumed(tp, 0);
|
||||||
|
|
||||||
|
fetcher.initFetches(cluster);
|
||||||
|
|
||||||
|
// Now the rebalance happens and fetch positions are cleared
|
||||||
|
subscriptions.changePartitionAssignment(Arrays.asList(tp));
|
||||||
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
|
||||||
|
consumerClient.poll(0);
|
||||||
|
|
||||||
|
// The active fetch should be ignored since its position is no longer valid
|
||||||
|
assertTrue(fetcher.fetchedRecords().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchFailed() {
|
public void testFetchFailed() {
|
||||||
List<ConsumerRecord<byte[], byte[]>> records;
|
|
||||||
subscriptions.subscribe(tp);
|
subscriptions.subscribe(tp);
|
||||||
subscriptions.fetched(tp, 0);
|
subscriptions.fetched(tp, 0);
|
||||||
subscriptions.consumed(tp, 0);
|
subscriptions.consumed(tp, 0);
|
||||||
|
@ -148,7 +166,6 @@ public class FetcherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchOutOfRange() {
|
public void testFetchOutOfRange() {
|
||||||
List<ConsumerRecord<byte[], byte[]>> records;
|
|
||||||
subscriptions.subscribe(tp);
|
subscriptions.subscribe(tp);
|
||||||
subscriptions.fetched(tp, 5);
|
subscriptions.fetched(tp, 5);
|
||||||
subscriptions.consumed(tp, 5);
|
subscriptions.consumed(tp, 5);
|
||||||
|
|
|
@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
||||||
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
|
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
|
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 1. Produce a bunch of messages
|
* 1. Produce a bunch of messages
|
||||||
|
|
Loading…
Reference in New Issue