KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>
This commit is contained in:
Guozhang Wang 2020-10-01 17:57:00 -07:00 committed by GitHub
parent ad17ea1089
commit 3bc2df7651
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 15 deletions

View File

@ -948,7 +948,7 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
resetState();
resetStateAndRejoin();
}
synchronized void resetGenerationOnLeaveGroup() {

View File

@ -120,6 +120,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -1842,7 +1843,7 @@ public class KafkaConsumerTest {
}
@Test
public void testReturnRecordsDuringRebalance() {
public void testReturnRecordsDuringRebalance() throws InterruptedException {
Time time = new MockTime(1L);
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
@ -1857,15 +1858,13 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
// a first poll with zero millisecond would not complete the rebalance
consumer.poll(Duration.ZERO);
// a poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100L));
return consumer.assignment().equals(Utils.mkSet(tp0, t2p0));
}, "Does not complete rebalance in time");
assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
assertEquals(Collections.emptySet(), consumer.assignment());
// a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
consumer.poll(Duration.ofMillis(100L));
assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
// prepare a response of the outstanding fetch so that we have data available on the next poll
@ -1918,7 +1917,6 @@ public class KafkaConsumerTest {
// mock rebalance responses
client.respondFrom(joinGroupFollowerResponse(assignor, 2, "memberId", "leaderId", Errors.NONE), coordinator);
client.prepareResponseFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator);
// we need to poll 1) for getting the join response, and then send the sync request;
// 2) for getting the sync response
@ -1934,12 +1932,19 @@ public class KafkaConsumerTest {
fetches1.put(tp0, new FetchInfo(3, 1));
client.respondFrom(fetchResponse(fetches1), node);
records = consumer.poll(Duration.ZERO);
// now complete the rebalance
client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator);
AtomicInteger count = new AtomicInteger(0);
TestUtils.waitForCondition(() -> {
ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L));
return consumer.assignment().equals(Utils.mkSet(tp0, t3p0)) && count.addAndGet(recs.count()) == 1;
}, "Does not complete rebalance in time");
// should have t3 but not sent yet the t3 records
assertEquals(Utils.mkSet(topic, topic3), consumer.subscription());
assertEquals(Utils.mkSet(tp0, t3p0), consumer.assignment());
assertEquals(1, records.count());
assertEquals(4L, consumer.position(tp0));
assertEquals(0L, consumer.position(t3p0));
@ -1948,10 +1953,13 @@ public class KafkaConsumerTest {
fetches1.put(t3p0, new FetchInfo(0, 100));
client.respondFrom(fetchResponse(fetches1), node);
records = consumer.poll(Duration.ZERO);
count.set(0);
TestUtils.waitForCondition(() -> {
ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L));
return count.addAndGet(recs.count()) == 101;
}, "Does not complete rebalance in time");
// should have t3 but not sent yet the t3 records
assertEquals(101, records.count());
assertEquals(5L, consumer.position(tp0));
assertEquals(100L, consumer.position(t3p0));