KAFKA-17032 NioEchoServer should generate meaningful id instead of incremental number (#16460)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TingIāu "Ting" Kì 2024-07-03 03:28:42 +08:00 committed by GitHub
parent 601659f951
commit c97d4ce026
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 25 additions and 11 deletions

View File

@ -233,6 +233,21 @@ public class Selector implements Selectable, AutoCloseable {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
}
/**
* Generates a unique connection ID for the given socket.
*
* @param socket The socket for which the connection ID is to be generated.
* @param connectionIndex The index to be used in the connection ID to ensure uniqueness.
* @return A string representing the unique connection ID.
*/
public static String generateConnectionId(Socket socket, int connectionIndex) {
String localHost = socket.getLocalAddress().getHostAddress();
int localPort = socket.getLocalPort();
String remoteHost = socket.getInetAddress().getHostAddress();
int remotePort = socket.getPort();
return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + connectionIndex;
}
/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.

View File

@ -53,7 +53,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -90,7 +89,7 @@ public class NioEchoServer extends Thread {
private volatile boolean closeKafkaChannels;
private final DelegationTokenCache tokenCache;
private final Time time;
private final AtomicLong idGenerator = new AtomicLong();
private int nextConnectionIndex = 0;
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache, Time time) throws Exception {
@ -228,8 +227,7 @@ public class NioEchoServer extends Thread {
selector.poll(100);
synchronized (newChannels) {
for (SocketChannel socketChannel : newChannels) {
String id = id();
selector.register(id, socketChannel);
selector.register(id(socketChannel), socketChannel);
socketChannels.add(socketChannel);
}
newChannels.clear();
@ -279,8 +277,13 @@ public class NioEchoServer extends Thread {
return false;
}
private String id() {
return "connection-" + idGenerator.getAndIncrement();
private String id(SocketChannel channel) {
String connectionId = Selector.generateConnectionId(channel.socket(), nextConnectionIndex);
if (nextConnectionIndex == Integer.MAX_VALUE)
nextConnectionIndex = 0;
else
nextConnectionIndex = nextConnectionIndex + 1;
return connectionId;
}
private KafkaChannel channel(String id) {

View File

@ -1315,11 +1315,7 @@ private[kafka] class Processor(
// 'protected` to allow override for testing
protected[network] def connectionId(socket: Socket): String = {
val localHost = socket.getLocalAddress.getHostAddress
val localPort = socket.getLocalPort
val remoteHost = socket.getInetAddress.getHostAddress
val remotePort = socket.getPort
val connId = ConnectionId(localHost, localPort, remoteHost, remotePort, nextConnectionIndex).toString
val connId = KSelector.generateConnectionId(socket, nextConnectionIndex)
nextConnectionIndex = if (nextConnectionIndex == Int.MaxValue) 0 else nextConnectionIndex + 1
connId
}