KAFKA-19561: Set OP_WRITE interest after SASL reauthentication to resume pending writes (#20258)

https://issues.apache.org/jira/browse/KAFKA-19561

Addresses a race condition during SASL reauthentication where the
server-side `KafkaChannel.send()` queues a response, but OP_WRITE is
removed before the channel becomes writable — resulting in stuck
responses and client  timeouts.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Manikumar Reddy 2025-07-31 21:59:21 +05:30 committed by GitHub
parent dfaf9f9cf7
commit f73e3bcd6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 85 additions and 1 deletions

View File

@ -681,4 +681,14 @@ public class KafkaChannel implements AutoCloseable {
public ChannelMetadataRegistry channelMetadataRegistry() {
return metadataRegistry;
}
/**
* Maybe add write interest after re-authentication. This is to ensure that any pending write operation
* is resumed.
*/
public void maybeAddWriteInterestAfterReauth() {
if (send != null)
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
}

View File

@ -551,6 +551,7 @@ public class Selector implements Selectable, AutoCloseable {
boolean isReauthentication = channel.successfulAuthentications() > 1;
if (isReauthentication) {
sensors.successfulReauthentication.record(1.0, readyTimeMs);
channel.maybeAddWriteInterestAfterReauth();
if (channel.reauthenticationLatencyMs() == null)
log.warn(
"Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
@ -120,6 +121,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -1856,6 +1858,69 @@ public class SaslAuthenticatorTest {
verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED);
}
@Test
public void testServerSidePendingSendDuringReauthentication() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), new HashMap<>());
jaasConfig.setClientOptions("PLAIN", TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD);
String callbackPrefix = ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN");
saslServerConfigs.put(callbackPrefix + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
TestServerCallbackHandler.class.getName());
server = createEchoServer(securityProtocol);
String node = "node1";
try {
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelReady(selector, node);
server.verifyAuthenticationMetrics(1, 0);
/*
* Now start the reauthentication on the connection. First, we have to sleep long enough so
* that the next write will cause re-authentication
*/
delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
server.verifyReauthenticationMetrics(0, 0);
// block reauthentication to complete
TestServerCallbackHandler.sem.acquire();
String prefix = TestUtils.randomString(100);
// send a client request to start a reauthentication.
selector.send(new NetworkSend(node, ByteBufferSend.sizePrefixed(ByteBuffer.wrap((prefix + "-0").getBytes(StandardCharsets.UTF_8)))));
// wait till reauthentication is blocked
TestUtils.waitForCondition(() -> {
selector.poll(10L);
return TestServerCallbackHandler.sem.hasQueuedThreads();
}, 5000, "Reauthentication is not blocked");
// Set the client's channel `send` to null to allow setting a new send on the server's selector.
// Without this, NioEchoServer will throw an error while processing the client request,
// since we're manually setting a server side send to simulate the issue.
TestUtils.setFieldValue(selector.channel(node), "send", null);
// extract the channel id from the server's selector and directly set a send on it.
String channelId = server.selector().channels().get(0).id();
String payload = prefix + "-1";
server.selector().send(new NetworkSend(channelId, ByteBufferSend.sizePrefixed(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)))));
// allow reauthentication to complete
TestServerCallbackHandler.sem.release();
TestUtils.waitForCondition(() -> {
selector.poll(10L);
for (NetworkReceive receive : selector.completedReceives()) {
assertEquals(payload, new String(Utils.toArray(receive.payload()), StandardCharsets.UTF_8));
return true;
}
return false;
}, 5000, "Failed Receive the server send after reauthentication");
server.verifyReauthenticationMetrics(1, 0);
} finally {
closeClientConnectionIfNecessary();
}
}
private void verifySslClientAuthForSaslSslListener(boolean useListenerPrefix,
SslClientAuth configuredClientAuth) throws Exception {
@ -2311,6 +2376,7 @@ public class SaslAuthenticatorTest {
static final String USERNAME = "TestServerCallbackHandler-user";
static final String PASSWORD = "TestServerCallbackHandler-password";
private volatile boolean configured;
public static Semaphore sem = new Semaphore(1);
@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
@ -2324,7 +2390,14 @@ public class SaslAuthenticatorTest {
protected boolean authenticate(String username, char[] password) {
if (!configured)
throw new IllegalStateException("Server callback handler not configured");
return USERNAME.equals(username) && new String(password).equals(PASSWORD);
try {
sem.acquire();
return USERNAME.equals(username) && new String(password).equals(PASSWORD);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
sem.release();
}
}
}