MINOR: Improve handling of channel close exception

Propagate IOException in SslTransportLayer channel.close to be consistent with PlaintextTransportLayer,  close authenticator on channel close even if transport layer close fails.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1370 from rajinisivaram/minor-channelclose2
This commit is contained in:
Rajini Sivaram 2016-05-11 21:11:17 +01:00 committed by Ismael Juma
parent bd8681cdd5
commit b28bc57a1f
5 changed files with 112 additions and 15 deletions

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.network;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.security.Principal;
@ -27,7 +28,7 @@ import org.apache.kafka.common.KafkaException;
/**
* Authentication for Channel
*/
public interface Authenticator {
public interface Authenticator extends Closeable {
/**
* Configures Authenticator using the provided parameters.
@ -54,11 +55,4 @@ public interface Authenticator {
*/
boolean complete();
/**
* Closes this Authenticator
*
* @throws IOException if any I/O error occurs
*/
void close() throws IOException;
}

View File

@ -26,6 +26,8 @@ import java.nio.channels.SelectionKey;
import java.security.Principal;
import org.apache.kafka.common.utils.Utils;
public class KafkaChannel {
private final String id;
private final TransportLayer transportLayer;
@ -42,8 +44,7 @@ public class KafkaChannel {
}
public void close() throws IOException {
transportLayer.close();
authenticator.close();
Utils.closeAll(transportLayer, authenticator);
}
/**

View File

@ -141,7 +141,7 @@ public class SslTransportLayer implements TransportLayer {
* Sends a SSL close message and closes socketChannel.
*/
@Override
public void close() {
public void close() throws IOException {
if (closing) return;
closing = true;
sslEngine.closeOutbound();
@ -168,13 +168,12 @@ public class SslTransportLayer implements TransportLayer {
try {
socketChannel.socket().close();
socketChannel.close();
} catch (IOException e) {
log.warn("Failed to close SSL socket channel: " + e);
}
}
} finally {
key.attach(null);
key.cancel();
}
}
}
/**
* returns true if there are any pending contents in netWriteBuffer

View File

@ -14,6 +14,7 @@ package org.apache.kafka.common.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.OutputStream;
@ -676,4 +677,26 @@ public class Utils {
}
}
/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
* The first IOException is thrown with subsequent exceptions
* added as suppressed exceptions.
*/
public static void closeAll(Closeable... closeables) throws IOException {
IOException exception = null;
for (Closeable closeable : closeables) {
try {
closeable.close();
} catch (IOException e) {
if (exception != null)
exception.addSuppressed(e);
else
exception = e;
}
}
if (exception != null)
throw exception;
}
}

View File

@ -18,6 +18,8 @@ package org.apache.kafka.common.utils;
import java.util.Arrays;
import java.util.Collections;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;
@ -26,6 +28,8 @@ import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class UtilsTest {
@ -114,4 +118,80 @@ public class UtilsTest {
assertEquals(1, Utils.min(2, 1, 3));
assertEquals(1, Utils.min(2, 3, 1));
}
@Test
public void testCloseAll() {
TestCloseable[] closeablesWithoutException = TestCloseable.createCloseables(false, false, false);
try {
Utils.closeAll(closeablesWithoutException);
TestCloseable.checkClosed(closeablesWithoutException);
} catch (IOException e) {
fail("Unexpected exception: " + e);
}
TestCloseable[] closeablesWithException = TestCloseable.createCloseables(true, true, true);
try {
Utils.closeAll(closeablesWithException);
fail("Expected exception not thrown");
} catch (IOException e) {
TestCloseable.checkClosed(closeablesWithException);
TestCloseable.checkException(e, closeablesWithException);
}
TestCloseable[] singleExceptionCloseables = TestCloseable.createCloseables(false, true, false);
try {
Utils.closeAll(singleExceptionCloseables);
fail("Expected exception not thrown");
} catch (IOException e) {
TestCloseable.checkClosed(singleExceptionCloseables);
TestCloseable.checkException(e, singleExceptionCloseables[1]);
}
TestCloseable[] mixedCloseables = TestCloseable.createCloseables(false, true, false, true, true);
try {
Utils.closeAll(mixedCloseables);
fail("Expected exception not thrown");
} catch (IOException e) {
TestCloseable.checkClosed(mixedCloseables);
TestCloseable.checkException(e, mixedCloseables[1], mixedCloseables[3], mixedCloseables[4]);
}
}
private static class TestCloseable implements Closeable {
private final int id;
private final IOException closeException;
private boolean closed;
TestCloseable(int id, boolean exceptionOnClose) {
this.id = id;
this.closeException = exceptionOnClose ? new IOException("Test close exception " + id) : null;
}
@Override
public void close() throws IOException {
closed = true;
if (closeException != null)
throw closeException;
}
static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
TestCloseable[] closeables = new TestCloseable[exceptionOnClose.length];
for (int i = 0; i < closeables.length; i++)
closeables[i] = new TestCloseable(i, exceptionOnClose[i]);
return closeables;
}
static void checkClosed(TestCloseable... closeables) {
for (TestCloseable closeable : closeables)
assertTrue("Close not invoked for " + closeable.id, closeable.closed);
}
static void checkException(IOException e, TestCloseable... closeablesWithException) {
assertEquals(closeablesWithException[0].closeException, e);
Throwable[] suppressed = e.getSuppressed();
assertEquals(closeablesWithException.length - 1, suppressed.length);
for (int i = 1; i < closeablesWithException.length; i++)
assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]);
}
}
}