diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 052dc5c8b45..68698ab7b8d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -233,6 +233,21 @@ public class Selector implements Selectable, AutoCloseable { this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext); } + /** + * Generates a unique connection ID for the given socket. + * + * @param socket The socket for which the connection ID is to be generated. + * @param connectionIndex The index to be used in the connection ID to ensure uniqueness. + * @return A string representing the unique connection ID. + */ + public static String generateConnectionId(Socket socket, int connectionIndex) { + String localHost = socket.getLocalAddress().getHostAddress(); + int localPort = socket.getLocalPort(); + String remoteHost = socket.getInetAddress().getHostAddress(); + int remotePort = socket.getPort(); + return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + connectionIndex; + } + /** * Begin connecting to the given address and add the connection to this nioSelector associated with the given id * number. diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index 78f6cecd005..512a98e5ccf 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -53,7 +53,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -90,7 +89,7 @@ public class NioEchoServer extends Thread { private volatile boolean closeKafkaChannels; private final DelegationTokenCache tokenCache; private final Time time; - private final AtomicLong idGenerator = new AtomicLong(); + private int nextConnectionIndex = 0; public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache, Time time) throws Exception { @@ -228,8 +227,7 @@ public class NioEchoServer extends Thread { selector.poll(100); synchronized (newChannels) { for (SocketChannel socketChannel : newChannels) { - String id = id(); - selector.register(id, socketChannel); + selector.register(id(socketChannel), socketChannel); socketChannels.add(socketChannel); } newChannels.clear(); @@ -279,8 +277,13 @@ public class NioEchoServer extends Thread { return false; } - private String id() { - return "connection-" + idGenerator.getAndIncrement(); + private String id(SocketChannel channel) { + String connectionId = Selector.generateConnectionId(channel.socket(), nextConnectionIndex); + if (nextConnectionIndex == Integer.MAX_VALUE) + nextConnectionIndex = 0; + else + nextConnectionIndex = nextConnectionIndex + 1; + return connectionId; } private KafkaChannel channel(String id) { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 42d1e6585e6..4352b26d958 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1315,11 +1315,7 @@ private[kafka] class Processor( // 'protected` to allow override for testing protected[network] def connectionId(socket: Socket): String = { - val localHost = socket.getLocalAddress.getHostAddress - val localPort = socket.getLocalPort - val remoteHost = socket.getInetAddress.getHostAddress - val remotePort = socket.getPort - val connId = ConnectionId(localHost, localPort, remoteHost, remotePort, nextConnectionIndex).toString + val connId = KSelector.generateConnectionId(socket, nextConnectionIndex) nextConnectionIndex = if (nextConnectionIndex == Int.MaxValue) 0 else nextConnectionIndex + 1 connId }