KAFKA-6916; Refresh metadata in admin client if broker connection fails (#5050)

Refresh metadata if broker connection fails so that new calls are sent only to nodes that are alive and requests to controller are sent to the new controller if controller changes due to broker failure. Also reassign calls that could not be sent.

Reviewers: Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Rajini Sivaram 2018-05-29 16:37:17 +01:00 committed by Jason Gustafson
parent 24a7b1c6a5
commit 3a8d3a7927
3 changed files with 43 additions and 2 deletions

View File

@ -991,6 +991,29 @@ public class KafkaAdminClient extends AdminClient {
}
}
/**
* Reassign calls that have not yet been sent. When metadata is refreshed,
* all unsent calls are reassigned to handle controller change and node changes.
* When a node is disconnected, all calls assigned to the node are reassigned.
*
* @param now The current time in milliseconds
* @param disconnectedOnly Reassign only calls to nodes that were disconnected
* in the last poll
*/
private void reassignUnsentCalls(long now, boolean disconnectedOnly) {
ArrayList<Call> pendingCallsToSend = new ArrayList<>();
for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Node, List<Call>> entry = iter.next();
if (!disconnectedOnly || client.connectionFailed(entry.getKey())) {
for (Call call : entry.getValue()) {
pendingCallsToSend.add(call);
}
iter.remove();
}
}
chooseNodesForPendingCalls(now, pendingCallsToSend.iterator());
}
private boolean hasActiveExternalCalls(Collection<Call> calls) {
for (Call call : calls) {
if (!call.isInternal()) {
@ -1075,6 +1098,7 @@ public class KafkaAdminClient extends AdminClient {
// Update the current time and handle the latest responses.
now = time.milliseconds();
reassignUnsentCalls(now, true); // reassign calls to disconnected nodes
handleResponses(now, responses);
}
int numTimedOut = 0;
@ -1158,7 +1182,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
metadataManager.update(response.cluster(), time.milliseconds());
long now = time.milliseconds();
metadataManager.update(response.cluster(), now);
reassignUnsentCalls(now, false);
}
@Override

View File

@ -116,7 +116,7 @@ public class AdminMetadataManager {
@Override
public void requestUpdate() {
// Do nothing
AdminMetadataManager.this.requestUpdate();
}
}

View File

@ -219,6 +219,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForTopics(client, List(), topics)
}
@Test
def testMetadataRefresh(): Unit = {
client = AdminClient.create(createConfig())
val topics = Seq("mytopic")
val newTopics = Seq(new NewTopic("mytopic", 3, 3))
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, expectedPresent = topics, expectedMissing = List())
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
controller.shutdown()
controller.awaitShutdown()
val topicDesc = client.describeTopics(topics.asJava).all.get()
assertEquals(topics.toSet, topicDesc.keySet.asScala)
}
/**
* describe should not auto create topics
*/