KAFKA-3659: Handle coordinator disconnects more gracefully in client

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Grant Henke <granthenke@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1322 from hachikuji/KAFKA-3659
This commit is contained in:
Jason Gustafson 2016-05-05 12:03:28 -07:00 committed by Ewen Cheslack-Postava
parent 694303a3da
commit 32bf83e5a7
8 changed files with 225 additions and 60 deletions

View File

@ -938,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())

View File

@ -171,9 +171,9 @@ public abstract class AbstractCoordinator implements Closeable {
ByteBuffer memberAssignment);
/**
* Block until the coordinator for this group is known.
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public void ensureCoordinatorKnown() {
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();
client.poll(future);
@ -183,7 +183,13 @@ public abstract class AbstractCoordinator implements Closeable {
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}
@ -208,7 +214,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
while (needRejoin()) {
ensureCoordinatorKnown();
ensureCoordinatorReady();
// ensure that there are no pending requests to the coordinator. This is important
// in particular to avoid resending a pending JoinGroup request.

View File

@ -321,7 +321,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
*/
public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
while (true) {
ensureCoordinatorKnown();
ensureCoordinatorReady();
// contact coordinator to fetch committed offsets
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
@ -397,7 +397,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return;
while (true) {
ensureCoordinatorKnown();
ensureCoordinatorReady();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);

View File

@ -18,9 +18,11 @@ package org.apache.kafka.clients;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@ -58,6 +60,7 @@ public class MockClient implements KafkaClient {
private int correlation = 0;
private Node node = null;
private final Set<String> ready = new HashSet<>();
private final Map<Node, Long> blackedOut = new HashMap<>();
private final Queue<ClientRequest> requests = new ArrayDeque<>();
private final Queue<ClientResponse> responses = new ArrayDeque<>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
@ -73,6 +76,8 @@ public class MockClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
if (isBlackedOut(node))
return false;
ready.add(node.idString());
return true;
}
@ -82,9 +87,26 @@ public class MockClient implements KafkaClient {
return 0;
}
public void blackout(Node node, long duration) {
blackedOut.put(node, time.milliseconds() + duration);
}
private boolean isBlackedOut(Node node) {
if (blackedOut.containsKey(node)) {
long expiration = blackedOut.get(node);
if (time.milliseconds() > expiration) {
blackedOut.remove(node);
return false;
} else {
return true;
}
}
return false;
}
@Override
public boolean connectionFailed(Node node) {
return false;
return isBlackedOut(node);
}
public void disconnect(String node) {

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class AbstractCoordinatorTest {
private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
private static final int SESSION_TIMEOUT_MS = 30000;
private static final int HEARTBEAT_INTERVAL_MS = 3000;
private static final long RETRY_BACKOFF_MS = 100;
private static final long REQUEST_TIMEOUT_MS = 40000;
private static final String GROUP_ID = "dummy-group";
private static final String METRIC_GROUP_PREFIX = "consumer";
private MockClient mockClient;
private MockTime mockTime;
private Node node;
private Node coordinatorNode;
private ConsumerNetworkClient consumerClient;
private DummyCoordinator coordinator;
@Before
public void setupCoordinator() {
this.mockTime = new MockTime();
this.mockClient = new MockClient(mockTime);
Metadata metadata = new Metadata();
this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime,
RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
Metrics metrics = new Metrics();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
metadata.update(cluster, mockTime.milliseconds());
this.node = cluster.nodes().get(0);
mockClient.setNode(node);
this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime);
}
@Test
public void testCoordinatorDiscoveryBackoff() {
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
// blackout the coordinator for 50 milliseconds to simulate a disconnect.
// after backing off, we should be able to connect.
mockClient.blackout(coordinatorNode, 50L);
long initialTime = mockTime.milliseconds();
coordinator.ensureCoordinatorReady();
long endTime = mockTime.milliseconds();
assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
}
private Struct groupCoordinatorResponse(Node node, short error) {
GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
}
public class DummyCoordinator extends AbstractCoordinator {
public DummyCoordinator(ConsumerNetworkClient client,
Metrics metrics,
Time time) {
super(client, GROUP_ID, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS);
}
@Override
protected String protocolType() {
return "dummy";
}
@Override
protected List<JoinGroupRequest.ProtocolMetadata> metadata() {
return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata("dummy-subprotocol", EMPTY_DATA));
}
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
Map<String, ByteBuffer> assignment = new HashMap<>();
for (Map.Entry<String, ByteBuffer> metadata : allMemberMetadata.entrySet())
assignment.put(metadata.getKey(), EMPTY_DATA);
return assignment;
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
}
@Override
protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
}
}
}

View File

@ -125,7 +125,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testNormalHeartbeat() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// normal heartbeat
time.sleep(sessionTimeoutMs);
@ -143,7 +143,7 @@ public class ConsumerCoordinatorTest {
@Test(expected = GroupAuthorizationException.class)
public void testGroupDescribeUnauthorized() {
client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
}
@Test(expected = GroupAuthorizationException.class)
@ -151,7 +151,7 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
Errors.GROUP_AUTHORIZATION_FAILED.code()));
@ -161,7 +161,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@ -182,7 +182,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// not_coordinator will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@ -203,7 +203,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testIllegalGeneration() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@ -227,7 +227,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testUnknownConsumerId() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@ -251,7 +251,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCoordinatorDisconnect() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// coordinator disconnect will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@ -281,7 +281,7 @@ public class ConsumerCoordinatorTest {
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
Errors.INVALID_GROUP_ID.code()));
@ -300,7 +300,7 @@ public class ConsumerCoordinatorTest {
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// normal join group
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
@ -338,7 +338,7 @@ public class ConsumerCoordinatorTest {
metadata.update(cluster, time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
@ -373,7 +373,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@ -404,7 +404,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@ -432,7 +432,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@ -462,7 +462,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@ -478,7 +478,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join initially, but let coordinator returns unknown member id
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@ -508,7 +508,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@ -532,7 +532,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join initially, but let coordinator rebalance on sync
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@ -562,7 +562,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@ -597,7 +597,7 @@ public class ConsumerCoordinatorTest {
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// prepare initial rebalance
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics);
@ -658,7 +658,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join the group once
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
@ -686,7 +686,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// disconnected from original coordinator will cause re-discover and join again
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
@ -707,7 +707,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// coordinator doesn't like the session timeout
client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
@ -719,7 +719,7 @@ public class ConsumerCoordinatorTest {
subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
@ -741,7 +741,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@ -767,7 +767,7 @@ public class ConsumerCoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// haven't joined, so should not cause a commit
time.sleep(autoCommitIntervalMs);
@ -795,7 +795,7 @@ public class ConsumerCoordinatorTest {
subscriptions.seek(tp, 100);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
time.sleep(autoCommitIntervalMs);
@ -821,7 +821,7 @@ public class ConsumerCoordinatorTest {
// now find the coordinator
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// sleep only for the retry backoff
time.sleep(retryBackoffMs);
@ -836,7 +836,7 @@ public class ConsumerCoordinatorTest {
subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
@ -852,7 +852,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetAsyncWithDefaultCallback() {
int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
@ -865,7 +865,7 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@ -896,7 +896,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetAsyncFailedWithDefaultCallback() {
int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
@ -906,7 +906,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetAsyncCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// async commit with coordinator not available
MockCommitCallback cb = new MockCommitCallback();
@ -921,7 +921,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetAsyncNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// async commit with not coordinator
MockCommitCallback cb = new MockCommitCallback();
@ -936,7 +936,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetAsyncDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// async commit with coordinator disconnected
MockCommitCallback cb = new MockCommitCallback();
@ -951,7 +951,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetSyncNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
@ -963,7 +963,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetSyncCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
@ -975,7 +975,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetSyncCoordinatorDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
@ -988,7 +988,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetMetadataTooLarge() {
// since offset metadata is provided by the user, we have to propagate the exception so they can handle it
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
@ -998,7 +998,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetIllegalGeneration() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
@ -1008,7 +1008,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetUnknownMemberId() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
@ -1018,7 +1018,7 @@ public class ConsumerCoordinatorTest {
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
@ -1027,7 +1027,7 @@ public class ConsumerCoordinatorTest {
@Test(expected = KafkaException.class)
public void testCommitOffsetSyncCallbackWithNonRetriableException() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// sync commit with invalid partitions should throw if we have no callback
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false);
@ -1037,7 +1037,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testRefreshOffset() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
@ -1050,7 +1050,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testRefreshOffsetLoadInProgress() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
@ -1064,7 +1064,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testRefreshOffsetNotCoordinatorForConsumer() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
@ -1079,7 +1079,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testRefreshOffsetWithNoFetchableOffsets() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();

View File

@ -131,7 +131,7 @@ public class WorkerGroupMember {
}
public void ensureActive() {
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
coordinator.ensureActiveGroup();
}
@ -143,7 +143,7 @@ public class WorkerGroupMember {
long remaining = timeout;
while (remaining >= 0) {
long start = time.milliseconds();
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
coordinator.ensureActiveGroup();
client.poll(remaining);
remaining -= time.milliseconds() - start;

View File

@ -171,7 +171,7 @@ public class WorkerCoordinatorTest {
final String consumerId = "leader";
client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// normal join group
Map<String, Long> memberConfigOffsets = new HashMap<>();
@ -211,7 +211,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
@ -252,7 +252,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// config mismatch results in assignment error
client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
@ -283,7 +283,7 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
coordinator.ensureCoordinatorReady();
// join the group once
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));