KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause invalid position

This commit is contained in:
Jason Gustafson 2015-07-20 16:29:46 -07:00
parent 2040890462
commit cabb017731
3 changed files with 35 additions and 4 deletions

View File

@ -219,7 +219,7 @@ public class Fetcher<K, V> {
for (PartitionRecords<K, V> part : this.records) {
Long consumed = subscriptions.consumed(part.partition);
if (this.subscriptions.assignedPartitions().contains(part.partition)
&& (consumed == null || part.fetchOffset == consumed)) {
&& consumed != null && part.fetchOffset == consumed) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@ -364,6 +364,20 @@ public class Fetcher<K, V> {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long consumed = subscriptions.consumed(tp);
if (consumed == null) {
continue;
} else if (consumed != fetchOffset) {
// the fetched position has gotten out of sync with the consumed position
// (which might happen when a rebalance occurs with a fetch in-flight),
// so we need to reset the fetch position so the next fetch is right
subscriptions.fetched(tp, consumed);
continue;
}
if (parsed.size() > 0) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);

View File

@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -114,9 +115,26 @@ public class FetcherTest {
}
}
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Arrays.asList(tp));
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
fetcher.initFetches(cluster);
// Now the rebalance happens and fetch positions are cleared
subscriptions.changePartitionAssignment(Arrays.asList(tp));
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
consumerClient.poll(0);
// The active fetch should be ignored since its position is no longer valid
assertTrue(fetcher.fetchedRecords().isEmpty());
}
@Test
public void testFetchFailed() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
@ -148,7 +166,6 @@ public class FetcherTest {
@Test
public void testFetchOutOfRange() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 5);
subscriptions.consumed(tp, 5);

View File

@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
/*
* 1. Produce a bunch of messages