KAFKA-5179; Log connection termination during authentication

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma, Jun Rao

Closes #2980 from rajinisivaram/KAFKA-5179
This commit is contained in:
Rajini Sivaram 2017-05-15 18:13:20 -04:00
parent 46aa88b9cf
commit 4c75f31a5f
14 changed files with 156 additions and 57 deletions

View File

@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
@ -482,10 +483,21 @@ public class NetworkClient implements KafkaClient {
* @param nodeId Id of the node to be disconnected
* @param now The current time
*/
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
connectionStates.disconnected(nodeId, now);
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
switch (disconnectState) {
case AUTHENTICATE:
log.warn("Connection to node {} terminated during authentication. This may indicate " +
"that authentication failed due to invalid credentials.", nodeId);
break;
case NOT_CONNECTED:
log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId);
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
}
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId);
if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
@ -508,7 +520,7 @@ public class NetworkClient implements KafkaClient {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
// we disconnected, so we should probably refresh our metadata
@ -567,7 +579,7 @@ public class NetworkClient implements KafkaClient {
log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.",
node, apiVersionsResponse.error());
this.selector.close(node);
processDisconnection(responses, node, now);
processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0));
}
@ -588,9 +600,10 @@ public class NetworkClient implements KafkaClient {
* @param now The current time
*/
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (String node : this.selector.disconnected()) {
for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
String node = entry.getKey();
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now);
processDisconnection(responses, node, now, entry.getValue());
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
@ -710,6 +723,10 @@ public class NetworkClient implements KafkaClient {
@Override
public void handleDisconnection(String destination) {
Cluster cluster = metadata.fetch();
// 'processDisconnection' generates warnings for misconfigured bootstrap server configuration
// resulting in 'Connection Refused' and misconfigured security resulting in authentication failures.
// The warning below handles the case where connection to a broker was established, but was disconnected
// before metadata could be obtained.
if (cluster.isBootstrapConfigured()) {
int nodeId = Integer.parseInt(destination);
Node node = cluster.nodeById(nodeId);

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
/**
* States for KafkaChannel:
* <ul>
* <li>NOT_CONNECTED: Connections are created in NOT_CONNECTED state. State is updated
* on {@link TransportLayer#finishConnect()} when socket connection is established.
* PLAINTEXT channels transition from NOT_CONNECTED to READY, others transition
* to AUTHENTICATE. Failures in NOT_CONNECTED state typically indicate that the
* remote endpoint is unavailable, which may be due to misconfigured endpoints.</li>
* <li>AUTHENTICATE: SSL, SASL_SSL and SASL_PLAINTEXT channels are in AUTHENTICATE state during SSL and
* SASL handshake. Disconnections in AUTHENTICATE state may indicate that SSL or SASL
* authentication failed. Channels transition to READY state when authentication completes
* successfully.</li>
* <li>READY: Connected, authenticated channels are in READY state. Channels may transition from
* READY to EXPIRED, FAILED_SEND or LOCAL_CLOSE.</li>
* <li>EXPIRED: Idle connections are moved to EXPIRED state on idle timeout and the channel is closed.</li>
* <li>FAILED_SEND: Channels transition from READY to FAILED_SEND state if the channel is closed due
* to a send failure.</li>
* <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is initiated locally.</li>
* </ul>
* If the remote endpoint closes a channel, the state of the channel reflects the state the channel
* was in at the time of disconnection. This state may be useful to identify the reason for disconnection.
* <p>
* Typical transitions:
* <ul>
* <li>PLAINTEXT Good path: NOT_CONNECTED => READY => LOCAL_CLOSE</li>
* <li>SASL/SSL Good path: NOT_CONNECTED => AUTHENTICATE => READY => LOCAL_CLOSE</li>
* <li>Bootstrap server misconfiguration: NOT_CONNECTED, disconnected in NOT_CONNECTED state</li>
* <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li>
* </ul>
*/
public enum ChannelState {
NOT_CONNECTED,
AUTHENTICATE,
READY,
EXPIRED,
FAILED_SEND,
LOCAL_CLOSE
}

View File

@ -41,6 +41,7 @@ public class KafkaChannel {
// processed after the channel is disconnected.
private boolean disconnected;
private boolean muted;
private ChannelState state;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
this.id = id;
@ -50,6 +51,7 @@ public class KafkaChannel {
this.maxReceiveSize = maxReceiveSize;
this.disconnected = false;
this.muted = false;
this.state = ChannelState.NOT_CONNECTED;
}
public void close() throws IOException {
@ -72,6 +74,8 @@ public class KafkaChannel {
transportLayer.handshake();
if (transportLayer.ready() && !authenticator.complete())
authenticator.authenticate();
if (ready())
state = ChannelState.READY;
}
public void disconnect() {
@ -79,9 +83,19 @@ public class KafkaChannel {
transportLayer.disconnect();
}
public void state(ChannelState state) {
this.state = state;
}
public ChannelState state() {
return this.state;
}
public boolean finishConnect() throws IOException {
return transportLayer.finishConnect();
boolean connected = transportLayer.finishConnect();
if (connected)
state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
return connected;
}
public boolean isConnected() {

View File

@ -20,6 +20,7 @@ package org.apache.kafka.common.network;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
/**
* An interface for asynchronous, multi-channel network I/O
@ -80,10 +81,10 @@ public interface Selectable {
public List<NetworkReceive> completedReceives();
/**
* The list of connections that finished disconnecting on the last {@link #poll(long) poll()}
* call.
* The connections that finished disconnecting on the last {@link #poll(long) poll()}
* call. Channel state indicates the local channel state at the time of disconnection.
*/
public List<String> disconnected();
public Map<String, ChannelState> disconnected();
/**
* The list of connections that completed their connection on the last {@link #poll(long) poll()}

View File

@ -92,7 +92,7 @@ public class Selector implements Selectable, AutoCloseable {
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
private final List<String> disconnected;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final Time time;
@ -137,7 +137,7 @@ public class Selector implements Selectable, AutoCloseable {
this.immediatelyConnectedKeys = new HashSet<>();
this.closingChannels = new HashMap<>();
this.connected = new ArrayList<>();
this.disconnected = new ArrayList<>();
this.disconnected = new HashMap<>();
this.failedSends = new ArrayList<>();
this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
this.channelBuilder = channelBuilder;
@ -413,7 +413,7 @@ public class Selector implements Selectable, AutoCloseable {
}
@Override
public List<String> disconnected() {
public Map<String, ChannelState> disconnected() {
return this.disconnected;
}
@ -466,6 +466,7 @@ public class Selector implements Selectable, AutoCloseable {
if (log.isTraceEnabled())
log.trace("About to close the idle connection from {} due to being idle for {} millis",
connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
channel.state(ChannelState.EXPIRED);
close(channel, true);
}
}
@ -489,7 +490,12 @@ public class Selector implements Selectable, AutoCloseable {
it.remove();
}
}
this.disconnected.addAll(this.failedSends);
for (String channel : this.failedSends) {
KafkaChannel failedChannel = closingChannels.get(channel);
if (failedChannel != null)
failedChannel.state(ChannelState.FAILED_SEND);
this.disconnected.put(channel, ChannelState.FAILED_SEND);
}
this.failedSends.clear();
}
@ -516,8 +522,12 @@ public class Selector implements Selectable, AutoCloseable {
*/
public void close(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel != null)
if (channel != null) {
// There is no disconnect notification for local close, but updating
// channel state here anyway to avoid confusion.
channel.state(ChannelState.LOCAL_CLOSE);
close(channel, false);
}
}
/**
@ -566,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
this.sensors.connectionClosed.record();
this.stagedReceives.remove(channel);
if (notifyDisconnect)
this.disconnected.add(channel.id());
this.disconnected.put(channel.id(), channel.state());
}
/**

View File

@ -185,8 +185,8 @@ public class NetworkClientTest {
// sleeping to make sure that the time since last send is greater than requestTimeOut
time.sleep(3000);
client.poll(3000, time.milliseconds());
String disconnectedNode = selector.disconnected().get(0);
assertEquals(node.idString(), disconnectedNode);
assertEquals(1, selector.disconnected().size());
assertTrue("Node not found in disconnected map", selector.disconnected().containsKey(node.idString()));
}
@Test

View File

@ -77,7 +77,7 @@ public class NetworkTestUtils {
assertTrue(selector.isChannelReady(node));
}
public static void waitForChannelClose(Selector selector, String node) throws IOException {
public static void waitForChannelClose(Selector selector, String node, ChannelState channelState) throws IOException {
boolean closed = false;
for (int i = 0; i < 30; i++) {
selector.poll(1000L);
@ -87,5 +87,6 @@ public class NetworkTestUtils {
}
}
assertTrue("Channel was not closed by timeout", closed);
assertEquals(channelState, selector.disconnected().get(node));
}
}

View File

@ -81,7 +81,7 @@ public class SelectorTest {
// disconnect
this.server.closeConnections();
while (!selector.disconnected().contains(node))
while (!selector.disconnected().containsKey(node))
selector.poll(1000L);
// reconnect and do another request
@ -127,8 +127,10 @@ public class SelectorTest {
ServerSocket nonListeningSocket = new ServerSocket(0);
int nonListeningPort = nonListeningSocket.getLocalPort();
selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
while (selector.disconnected().contains(node))
while (selector.disconnected().containsKey(node)) {
assertEquals(ChannelState.NOT_CONNECTED, selector.disconnected().get(node));
selector.poll(1000L);
}
nonListeningSocket.close();
}
@ -262,7 +264,8 @@ public class SelectorTest {
time.sleep(6000); // The max idle time is 5000ms
selector.poll(0);
assertTrue("The idle connection should have been closed", selector.disconnected().contains(id));
assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id));
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
}

View File

@ -154,7 +154,7 @@ public class SslSelectorTest extends SelectorTest {
List<String> disconnected = new ArrayList<>();
while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
selector.poll(10);
disconnected.addAll(selector.disconnected());
disconnected.addAll(selector.disconnected().keySet());
}
assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));

View File

@ -119,7 +119,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@ -184,7 +184,7 @@ public class SslTransportLayerTest {
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
selector.close();
server.close();
@ -212,7 +212,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@ -232,7 +232,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@ -384,7 +384,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@ -401,7 +401,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@ -419,7 +419,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
@ -137,8 +138,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword");
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -152,8 +152,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD);
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -286,8 +285,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -305,8 +303,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -323,8 +320,7 @@ public class SaslAuthenticatorTest {
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
createAndCheckClientConnection(securityProtocol, "2");
@ -425,7 +421,7 @@ public class SaslAuthenticatorTest {
SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
selector.send(request.toSend(node1, header));
NetworkTestUtils.waitForChannelClose(selector, node1);
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@ -451,7 +447,7 @@ public class SaslAuthenticatorTest {
byte[] bytes = new byte[1024];
random.nextBytes(bytes);
selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
NetworkTestUtils.waitForChannelClose(selector, node1);
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@ -462,7 +458,7 @@ public class SaslAuthenticatorTest {
createClientConnection(SecurityProtocol.PLAINTEXT, node2);
random.nextBytes(bytes);
selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
NetworkTestUtils.waitForChannelClose(selector, node2);
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@ -491,7 +487,7 @@ public class SaslAuthenticatorTest {
RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id,
request.version(), "someclient", 2);
selector.send(request.toSend(node1, versionsHeader));
NetworkTestUtils.waitForChannelClose(selector, node1);
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@ -518,7 +514,7 @@ public class SaslAuthenticatorTest {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node1, buffer));
NetworkTestUtils.waitForChannelClose(selector, node1);
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@ -532,7 +528,7 @@ public class SaslAuthenticatorTest {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node2, buffer));
NetworkTestUtils.waitForChannelClose(selector, node2);
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@ -557,7 +553,7 @@ public class SaslAuthenticatorTest {
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest1.version(), "someclient", 1);
selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
NetworkTestUtils.waitForChannelClose(selector, node1);
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@ -572,7 +568,7 @@ public class SaslAuthenticatorTest {
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest2.version(), "someclient", 2);
selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
NetworkTestUtils.waitForChannelClose(selector, node2);
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@ -608,8 +604,7 @@ public class SaslAuthenticatorTest {
configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -623,8 +618,7 @@ public class SaslAuthenticatorTest {
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@ -824,7 +818,7 @@ public class SaslAuthenticatorTest {
private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
selector.close();
selector = null;
}

View File

@ -19,9 +19,12 @@ package org.apache.kafka.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
@ -37,7 +40,7 @@ public class MockSelector implements Selectable {
private final List<Send> initiatedSends = new ArrayList<Send>();
private final List<Send> completedSends = new ArrayList<Send>();
private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
private final List<String> disconnected = new ArrayList<String>();
private final Map<String, ChannelState> disconnected = new HashMap<>();
private final List<String> connected = new ArrayList<String>();
private final List<DelayedReceive> delayedReceives = new ArrayList<>();
@ -60,7 +63,7 @@ public class MockSelector implements Selectable {
@Override
public void close(String id) {
this.disconnected.add(id);
this.disconnected.put(id, ChannelState.LOCAL_CLOSE);
for (int i = 0; i < this.connected.size(); i++) {
if (this.connected.get(i).equals(id)) {
this.connected.remove(i);
@ -121,7 +124,7 @@ public class MockSelector implements Selectable {
}
@Override
public List<String> disconnected() {
public Map<String, ChannelState> disconnected() {
return disconnected;
}

View File

@ -544,7 +544,7 @@ private[kafka] class Processor(val id: Int,
}
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
selector.disconnected.keySet.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost