mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517)
If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed. Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket. Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala. Author: Graham Campbell <graham.campbell@salesforce.com> Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
		
							parent
							
								
									63f4cc69ba
								
							
						
					
					
						commit
						73c646c442
					
				|  | @ -54,7 +54,7 @@ | |||
|               files="AbstractRequest.java"/> | ||||
| 
 | ||||
|     <suppress checks="NPathComplexity" | ||||
|               files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|PluginUtils).java"/> | ||||
|               files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Selector|Sender|Serdes|PluginUtils).java"/> | ||||
| 
 | ||||
|     <!-- clients tests --> | ||||
|     <suppress checks="ClassDataAbstractionCoupling" | ||||
|  |  | |||
|  | @ -269,9 +269,17 @@ public class Selector implements Selectable, AutoCloseable { | |||
|             KafkaChannel channel = channelOrFail(connectionId, false); | ||||
|             try { | ||||
|                 channel.setSend(send); | ||||
|             } catch (CancelledKeyException e) { | ||||
|             } catch (Exception e) { | ||||
|                 // update the state for consistency, the channel will be discarded after `close` | ||||
|                 channel.state(ChannelState.FAILED_SEND); | ||||
|                 // ensure notification via `disconnected` when `failedSends` are processed in the next poll | ||||
|                 this.failedSends.add(connectionId); | ||||
|                 close(channel, false); | ||||
|                 close(channel, false, false); | ||||
|                 if (!(e instanceof CancelledKeyException)) { | ||||
|                     log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", | ||||
|                             connectionId, e); | ||||
|                     throw e; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | @ -354,6 +362,7 @@ public class Selector implements Selectable, AutoCloseable { | |||
|             if (idleExpiryManager != null) | ||||
|                 idleExpiryManager.update(channel.id(), currentTimeNanos); | ||||
| 
 | ||||
|             boolean sendFailed = false; | ||||
|             try { | ||||
| 
 | ||||
|                 /* complete any connections that have finished their handshake (either normally or immediately) */ | ||||
|  | @ -384,7 +393,13 @@ public class Selector implements Selectable, AutoCloseable { | |||
| 
 | ||||
|                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ | ||||
|                 if (channel.ready() && key.isWritable()) { | ||||
|                     Send send = channel.write(); | ||||
|                     Send send = null; | ||||
|                     try { | ||||
|                         send = channel.write(); | ||||
|                     } catch (Exception e) { | ||||
|                         sendFailed = true; | ||||
|                         throw e; | ||||
|                     } | ||||
|                     if (send != null) { | ||||
|                         this.completedSends.add(send); | ||||
|                         this.sensors.recordBytesSent(channel.id(), send.size()); | ||||
|  | @ -393,7 +408,7 @@ public class Selector implements Selectable, AutoCloseable { | |||
| 
 | ||||
|                 /* cancel any defunct sockets */ | ||||
|                 if (!key.isValid()) | ||||
|                     close(channel, true); | ||||
|                     close(channel, true, true); | ||||
| 
 | ||||
|             } catch (Exception e) { | ||||
|                 String desc = channel.socketDescription(); | ||||
|  | @ -401,7 +416,7 @@ public class Selector implements Selectable, AutoCloseable { | |||
|                     log.debug("Connection with {} disconnected", desc, e); | ||||
|                 else | ||||
|                     log.warn("Unexpected error from {}; closing connection", desc, e); | ||||
|                 close(channel, true); | ||||
|                 close(channel, !sendFailed, true); | ||||
|             } finally { | ||||
|                 maybeRecordTimePerConnection(channel, channelStartTimeNanos); | ||||
|             } | ||||
|  | @ -479,7 +494,7 @@ public class Selector implements Selectable, AutoCloseable { | |||
|                     log.trace("About to close the idle connection from {} due to being idle for {} millis", | ||||
|                             connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); | ||||
|                 channel.state(ChannelState.EXPIRED); | ||||
|                 close(channel, true); | ||||
|                 close(channel, true, true); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | @ -538,7 +553,12 @@ public class Selector implements Selectable, AutoCloseable { | |||
|             // There is no disconnect notification for local close, but updating | ||||
|             // channel state here anyway to avoid confusion. | ||||
|             channel.state(ChannelState.LOCAL_CLOSE); | ||||
|             close(channel, false); | ||||
|             close(channel, false, false); | ||||
|         } else { | ||||
|             KafkaChannel closingChannel = this.closingChannels.remove(id); | ||||
|             // Close any closing channel, leave the channel in the state in which closing was triggered | ||||
|             if (closingChannel != null) | ||||
|                 doClose(closingChannel, false); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | @ -553,7 +573,10 @@ public class Selector implements Selectable, AutoCloseable { | |||
|      * closed immediately. The channel will not be added to disconnected list and it is the | ||||
|      * responsibility of the caller to handle disconnect notifications. | ||||
|      */ | ||||
|     private void close(KafkaChannel channel, boolean processOutstanding) { | ||||
|     private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) { | ||||
| 
 | ||||
|         if (processOutstanding && !notifyDisconnect) | ||||
|             throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests"); | ||||
| 
 | ||||
|         channel.disconnect(); | ||||
| 
 | ||||
|  | @ -571,8 +594,9 @@ public class Selector implements Selectable, AutoCloseable { | |||
|         if (processOutstanding && deque != null && !deque.isEmpty()) { | ||||
|             // stagedReceives will be moved to completedReceives later along with receives from other channels | ||||
|             closingChannels.put(channel.id(), channel); | ||||
|             log.debug("Tracking closing connection {} to process outstanding requests", channel.id()); | ||||
|         } else | ||||
|             doClose(channel, processOutstanding); | ||||
|             doClose(channel, notifyDisconnect); | ||||
|         this.channels.remove(channel.id()); | ||||
| 
 | ||||
|         if (idleExpiryManager != null) | ||||
|  | @ -700,7 +724,7 @@ public class Selector implements Selectable, AutoCloseable { | |||
|     } | ||||
| 
 | ||||
|     // only for testing | ||||
|     int numStagedReceives(KafkaChannel channel) { | ||||
|     public int numStagedReceives(KafkaChannel channel) { | ||||
|         Deque<NetworkReceive> deque = stagedReceives.get(channel); | ||||
|         return deque == null ? 0 : deque.size(); | ||||
|     } | ||||
|  |  | |||
|  | @ -605,6 +605,14 @@ private[kafka] class Processor(val id: Int, | |||
|   private[network] def channel(connectionId: String): Option[KafkaChannel] = | ||||
|     Option(selector.channel(connectionId)) | ||||
| 
 | ||||
|   /* For test usage */ | ||||
|   private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] = | ||||
|     channel(connectionId).orElse(Option(selector.closingChannel(connectionId))) | ||||
| 
 | ||||
|   // Visible for testing | ||||
|   private[network] def numStagedReceives(connectionId: String): Int = | ||||
|     openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0) | ||||
| 
 | ||||
|   /** | ||||
|    * Wakeup the thread for selection. | ||||
|    */ | ||||
|  |  | |||
|  | @ -65,7 +65,7 @@ class SocketServerTest extends JUnitSuite { | |||
|   server.startup() | ||||
|   val sockets = new ArrayBuffer[Socket] | ||||
| 
 | ||||
|   def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) { | ||||
|   def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true) { | ||||
|     val outgoing = new DataOutputStream(socket.getOutputStream) | ||||
|     id match { | ||||
|       case Some(id) => | ||||
|  | @ -75,7 +75,8 @@ class SocketServerTest extends JUnitSuite { | |||
|         outgoing.writeInt(request.length) | ||||
|     } | ||||
|     outgoing.write(request) | ||||
|     outgoing.flush() | ||||
|     if (flush) | ||||
|       outgoing.flush() | ||||
|   } | ||||
| 
 | ||||
|   def receiveResponse(socket: Socket): Array[Byte] = { | ||||
|  | @ -86,10 +87,15 @@ class SocketServerTest extends JUnitSuite { | |||
|     response | ||||
|   } | ||||
| 
 | ||||
|   private def receiveRequest(channel: RequestChannel, timeout: Long = 2000L): RequestChannel.Request = { | ||||
|     val request = channel.receiveRequest(timeout) | ||||
|     assertNotNull("receiveRequest timed out", request) | ||||
|     request | ||||
|   } | ||||
| 
 | ||||
|   /* A simple request handler that just echos back the response */ | ||||
|   def processRequest(channel: RequestChannel) { | ||||
|     val request = channel.receiveRequest(2000) | ||||
|     assertNotNull("receiveRequest timed out", request) | ||||
|     val request = receiveRequest(channel) | ||||
|     processRequest(channel, request) | ||||
|   } | ||||
| 
 | ||||
|  | @ -115,12 +121,11 @@ class SocketServerTest extends JUnitSuite { | |||
|     sockets.clear() | ||||
|   } | ||||
| 
 | ||||
|   private def producerRequestBytes: Array[Byte] = { | ||||
|   private def producerRequestBytes(ack: Short = 0): Array[Byte] = { | ||||
|     val apiKey: Short = 0 | ||||
|     val correlationId = -1 | ||||
|     val clientId = "" | ||||
|     val ackTimeoutMs = 10000 | ||||
|     val ack = 0: Short | ||||
| 
 | ||||
|     val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs, | ||||
|       new HashMap[TopicPartition, MemoryRecords]()).build() | ||||
|  | @ -133,11 +138,30 @@ class SocketServerTest extends JUnitSuite { | |||
|     serializedBytes | ||||
|   } | ||||
| 
 | ||||
|   private def sendRequestsUntilStagedReceive(server: SocketServer, socket: Socket, requestBytes: Array[Byte]): RequestChannel.Request = { | ||||
|     def sendTwoRequestsReceiveOne(): RequestChannel.Request = { | ||||
|       sendRequest(socket, requestBytes, flush = false) | ||||
|       sendRequest(socket, requestBytes, flush = true) | ||||
|       receiveRequest(server.requestChannel) | ||||
|     } | ||||
|     val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => | ||||
|       val connectionId = req.connectionId | ||||
|       val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 | ||||
|       if (!hasStagedReceives) { | ||||
|         processRequest(server.requestChannel, req) | ||||
|         processRequest(server.requestChannel) | ||||
|       } | ||||
|       hasStagedReceives | ||||
|     } | ||||
|     assertTrue(s"Receives not staged for ${org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS} ms", hasStagedReceives) | ||||
|     request | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def simpleRequest() { | ||||
|     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) | ||||
|     val traceSocket = connect(protocol = SecurityProtocol.TRACE) | ||||
|     val serializedBytes = producerRequestBytes | ||||
|     val serializedBytes = producerRequestBytes() | ||||
| 
 | ||||
|     // Test PLAINTEXT socket | ||||
|     sendRequest(plainSocket, serializedBytes) | ||||
|  | @ -171,7 +195,7 @@ class SocketServerTest extends JUnitSuite { | |||
|   @Test | ||||
|   def testGracefulClose() { | ||||
|     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) | ||||
|     val serializedBytes = producerRequestBytes | ||||
|     val serializedBytes = producerRequestBytes() | ||||
| 
 | ||||
|     for (_ <- 0 until 10) | ||||
|       sendRequest(plainSocket, serializedBytes) | ||||
|  | @ -236,7 +260,7 @@ class SocketServerTest extends JUnitSuite { | |||
|     TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length, | ||||
|       "Failed to decrement connection count after close") | ||||
|     val conn2 = connect() | ||||
|     val serializedBytes = producerRequestBytes | ||||
|     val serializedBytes = producerRequestBytes() | ||||
|     sendRequest(conn2, serializedBytes) | ||||
|     val request = server.requestChannel.receiveRequest(2000) | ||||
|     assertNotNull(request) | ||||
|  | @ -255,7 +279,7 @@ class SocketServerTest extends JUnitSuite { | |||
|       val conns = (0 until overrideNum).map(_ => connect(overrideServer)) | ||||
| 
 | ||||
|       // it should succeed | ||||
|       val serializedBytes = producerRequestBytes | ||||
|       val serializedBytes = producerRequestBytes() | ||||
|       sendRequest(conns.last, serializedBytes) | ||||
|       val request = overrideServer.requestChannel.receiveRequest(2000) | ||||
|       assertNotNull(request) | ||||
|  | @ -341,7 +365,7 @@ class SocketServerTest extends JUnitSuite { | |||
|     try { | ||||
|       overrideServer.startup() | ||||
|       conn = connect(overrideServer) | ||||
|       val serializedBytes = producerRequestBytes | ||||
|       val serializedBytes = producerRequestBytes() | ||||
|       sendRequest(conn, serializedBytes) | ||||
| 
 | ||||
|       val channel = overrideServer.requestChannel | ||||
|  | @ -367,6 +391,26 @@ class SocketServerTest extends JUnitSuite { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testClientDisconnectionWithStagedReceivesFullyProcessed() { | ||||
|     val socket = connect(server) | ||||
| 
 | ||||
|     // Setup channel to client with staged receives so when client disconnects | ||||
|     // it will be stored in Selector.closingChannels | ||||
|     val serializedBytes = producerRequestBytes(1) | ||||
|     val request = sendRequestsUntilStagedReceive(server, socket, serializedBytes) | ||||
|     val connectionId = request.connectionId | ||||
| 
 | ||||
|     // Set SoLinger to 0 to force a hard disconnect via TCP RST | ||||
|     socket.setSoLinger(true, 0) | ||||
|     socket.close() | ||||
| 
 | ||||
|     // Complete request with socket exception so that the channel is removed from Selector.closingChannels | ||||
|     processRequest(server.requestChannel, request) | ||||
|     TestUtils.waitUntilTrue(() => server.processor(0).openOrClosingChannel(connectionId).isEmpty, | ||||
|       "Channel not closed after failed send") | ||||
|   } | ||||
| 
 | ||||
|   /* | ||||
|    * Test that we update request metrics if the channel has been removed from the selector when the broker calls | ||||
|    * `selector.send` (selector closes old connections, for example). | ||||
|  | @ -381,7 +425,7 @@ class SocketServerTest extends JUnitSuite { | |||
|     try { | ||||
|       overrideServer.startup() | ||||
|       conn = connect(overrideServer) | ||||
|       val serializedBytes = producerRequestBytes | ||||
|       val serializedBytes = producerRequestBytes() | ||||
|       sendRequest(conn, serializedBytes) | ||||
|       val channel = overrideServer.requestChannel | ||||
|       val request = channel.receiveRequest(2000) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue