MINOR: Avoid FileInputStream/FileOutputStream (#5281)

They rely on finalizers (before Java 11), which create
unnecessary GC load. The alternatives are as easy to
use and don't have this issue.

Also use FileChannel directly instead of retrieving
it from RandomAccessFile whenever possible
since the indirection is unnecessary.

Finally, add a few try/finally blocks.

Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Ismael Juma 2018-06-27 01:00:05 -07:00 committed by GitHub
parent a20555102b
commit 7a74ec62d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 141 additions and 134 deletions

View File

@ -19,12 +19,10 @@ package org.apache.kafka.common.config.provider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.ConfigException;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@ -97,7 +95,7 @@ public class FileConfigProvider implements ConfigProvider {
// visible for testing
protected Reader reader(String path) throws IOException {
return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
return Files.newBufferedReader(Paths.get(path));
}
public void close() {

View File

@ -25,13 +25,13 @@ import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
@ -426,19 +426,16 @@ public class FileRecords extends AbstractRecords implements Closeable {
int initFileSize,
boolean preallocate) throws IOException {
if (mutable) {
if (fileAlreadyExists) {
return new RandomAccessFile(file, "rw").getChannel();
if (fileAlreadyExists || !preallocate) {
return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE);
} else {
if (preallocate) {
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.setLength(initFileSize);
return randomAccessFile.getChannel();
} else {
return new RandomAccessFile(file, "rw").getChannel();
}
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.setLength(initFileSize);
return randomAccessFile.getChannel();
}
} else {
return new FileInputStream(file).getChannel();
return FileChannel.open(file.toPath());
}
}

View File

@ -33,9 +33,11 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Principal;
@ -320,7 +322,7 @@ public class SslFactory implements Reconfigurable {
* using the specified configs (e.g. if the password or keystore type is invalid)
*/
KeyStore load() {
try (FileInputStream in = new FileInputStream(path)) {
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;

View File

@ -24,7 +24,6 @@ import java.io.Closeable;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
@ -41,6 +40,7 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.text.DecimalFormat;
@ -520,7 +520,7 @@ public final class Utils {
Properties props = new Properties();
if (filename != null) {
try (InputStream propStream = new FileInputStream(filename)) {
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
} else {
@ -590,8 +590,7 @@ public final class Utils {
public static String readFileAsString(String path, Charset charset) throws IOException {
if (charset == null) charset = Charset.defaultCharset();
try (FileInputStream stream = new FileInputStream(new File(path))) {
FileChannel fc = stream.getChannel();
try (FileChannel fc = FileChannel.open(Paths.get(path))) {
MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
return charset.decode(bb).toString();
}

View File

@ -26,9 +26,9 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -160,7 +160,7 @@ public class LazyDownConversionRecordsTest {
inputRecords, toMagic, 0L, Time.SYSTEM);
LazyDownConversionRecordsSend lazySend = lazyRecords.toSend("foo");
File outputFile = tempFile();
FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
FileChannel channel = FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
int written = 0;
while (written < bytesToConvert)

View File

@ -20,15 +20,17 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetAddress;
import javax.net.ssl.TrustManagerFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.Key;
import java.security.KeyPair;
@ -99,7 +101,7 @@ public class TestSslUtils {
private static void saveKeyStore(KeyStore ks, String filename,
Password password) throws GeneralSecurityException, IOException {
try (FileOutputStream out = new FileOutputStream(filename)) {
try (OutputStream out = Files.newOutputStream(Paths.get(filename))) {
ks.store(out, password.value().toCharArray());
}
}
@ -137,7 +139,7 @@ public class TestSslUtils {
public static <T extends Certificate> void createTrustStore(
String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException {
KeyStore ks = KeyStore.getInstance("JKS");
try (FileInputStream in = new FileInputStream(filename)) {
try (InputStream in = Files.newInputStream(Paths.get(filename))) {
ks.load(in, password.value().toCharArray());
} catch (EOFException e) {
ks = createEmptyKeyStore();

View File

@ -24,11 +24,12 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Map;
@ -62,9 +63,9 @@ public class FileStreamSinkTask extends SinkTask {
outputStream = System.out;
} else {
try {
outputStream = new PrintStream(new FileOutputStream(filename, true), false,
outputStream = new PrintStream(Files.newOutputStream(Paths.get(filename), StandardOpenOption.APPEND), false,
StandardCharsets.UTF_8.name());
} catch (FileNotFoundException | UnsupportedEncodingException e) {
} catch (IOException e) {
throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
}
}

View File

@ -17,12 +17,13 @@
package org.apache.kafka.connect.file;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -78,7 +79,7 @@ public class FileStreamSourceTask extends SourceTask {
public List<SourceRecord> poll() throws InterruptedException {
if (stream == null) {
try {
stream = new FileInputStream(filename);
stream = Files.newInputStream(Paths.get(filename));
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Object lastRecordedOffset = offset.get(POSITION_FIELD);
@ -92,7 +93,7 @@ public class FileStreamSourceTask extends SourceTask {
long skipped = stream.skip(skipLeft);
skipLeft -= skipped;
} catch (IOException e) {
log.error("Error while trying to seek to previous offset in file: ", e);
log.error("Error while trying to seek to previous offset in file {}: ", filename, e);
throw new ConnectException(e);
}
}
@ -104,12 +105,15 @@ public class FileStreamSourceTask extends SourceTask {
}
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
log.debug("Opened {} for reading", logFilename());
} catch (FileNotFoundException e) {
} catch (NoSuchFileException e) {
log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
synchronized (this) {
this.wait(1000);
}
return null;
} catch (IOException e) {
log.error("Error while trying to open file {}: ", filename, e);
throw new ConnectException(e);
}
}

View File

@ -27,8 +27,9 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -81,7 +82,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
task.start(config);
FileOutputStream os = new FileOutputStream(tempFile);
OutputStream os = Files.newOutputStream(tempFile.toPath());
assertEquals(null, task.poll());
os.write("partial line".getBytes());
os.flush();
@ -135,7 +136,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "5000");
task.start(config);
FileOutputStream os = new FileOutputStream(tempFile);
OutputStream os = Files.newOutputStream(tempFile.toPath());
for (int i = 0; i < 10_000; i++) {
os.write("Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
}

View File

@ -25,12 +25,11 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Map;
@ -69,7 +68,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
@SuppressWarnings("unchecked")
private void load() {
try (SafeObjectInputStream is = new SafeObjectInputStream(new FileInputStream(file))) {
try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
@ -80,8 +79,8 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
} catch (FileNotFoundException | EOFException e) {
// FileNotFoundException: Ignore, may be new.
} catch (NoSuchFileException | EOFException e) {
// NoSuchFileException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
} catch (IOException | ClassNotFoundException e) {
throw new ConnectException(e);
@ -90,7 +89,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
@Override
protected void save() {
try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(file.toPath()))) {
Map<byte[], byte[]> raw = new HashMap<>();
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;

View File

@ -17,7 +17,7 @@
package kafka.log
import java.io.{File, RandomAccessFile}
import java.io.{Closeable, File, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.{ByteBuffer, MappedByteBuffer}
@ -39,7 +39,7 @@ import scala.math.ceil
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable with Logging {
// Length of the index file
@volatile

View File

@ -16,9 +16,10 @@
*/
package kafka.log
import java.io._
import java.io.File
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import kafka.log.Log.offsetFromFile
import kafka.server.LogOffsetMetadata
@ -430,12 +431,9 @@ object ProducerStateManager {
val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - ProducerEntriesOffset)
ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
val fos = new FileOutputStream(file)
try {
fos.write(buffer.array, buffer.arrayOffset, buffer.limit())
} finally {
fos.close()
}
val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
try fileChannel.write(buffer)
finally fileChannel.close()
}
private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix)

View File

@ -81,8 +81,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
}
private def openChannel(): FileChannel = {
val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE)
val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
maybeChannel = Some(channel)
channel.position(channel.size)
channel

View File

@ -18,8 +18,9 @@
package kafka.server
import java.io._
import java.nio.file.Files
import java.nio.file.{Files, NoSuchFileException}
import java.util.Properties
import kafka.utils._
import org.apache.kafka.common.utils.Utils
@ -70,7 +71,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
throw new IOException("Unrecognized version of the server meta.properties file: " + version)
}
} catch {
case _: FileNotFoundException =>
case _: NoSuchFileException =>
warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
None
case e1: Exception =>

View File

@ -84,7 +84,7 @@ class CheckpointFile[T](val file: File,
new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
lock synchronized {
try {
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
val reader = Files.newBufferedReader(path)
var line: String = null
try {
line = reader.readLine()

View File

@ -18,7 +18,7 @@
import java.io._
import java.nio.channels._
import java.nio.file.{FileAlreadyExistsException, Files}
import java.nio.file.StandardOpenOption
/**
* A file lock a la flock/funlock
@ -27,10 +27,8 @@ import java.nio.file.{FileAlreadyExistsException, Files}
*/
class FileLock(val file: File) extends Logging {
try Files.createFile(file.toPath) // create the file if it doesn't exist
catch { case _: FileAlreadyExistsException => }
private val channel = new RandomAccessFile(file, "rw").getChannel()
private val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
private var flock: java.nio.channels.FileLock = null
/**

View File

@ -12,7 +12,7 @@
*/
package kafka.api
import java.io.FileOutputStream
import java.nio.file.Files
import java.util.Collections
import java.util.concurrent.{ExecutionException, TimeUnit}
@ -159,7 +159,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
@Test
def testConsumerGroupServiceWithAuthenticationFailure() {
val propsFile = TestUtils.tempFile()
val propsStream = new FileOutputStream(propsFile)
val propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("security.protocol=SASL_PLAINTEXT\n".getBytes())
propsStream.write(s"sasl.mechanism=$kafkaClientSaslMechanism".getBytes())
propsStream.close()

View File

@ -18,15 +18,15 @@
package kafka.server
import java.io.{Closeable, File, FileInputStream, FileWriter}
import java.nio.file.{Files, StandardCopyOption}
import java.io.{Closeable, File, FileWriter}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.lang.management.ManagementFactory
import java.security.KeyStore
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._
import javax.management.ObjectName
import javax.management.ObjectName
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
@ -57,7 +57,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestSslUtils
import org.junit.Assert._
import org.junit.{After, Before, Test, Ignore}
import org.junit.{After, Before, Ignore, Test}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@ -1056,13 +1056,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
def load(props: Properties): KeyStore = {
val ks = KeyStore.getInstance("JKS")
val password = props.get(SSL_TRUSTSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value
val in = new FileInputStream(props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))
val in = Files.newInputStream(Paths.get(props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
try {
ks.load(in, password.toCharArray)
ks
} finally {
in.close()
}
ks
}
val cert1 = load(trustStore1Props).getCertificate("kafka")
val cert2 = load(trustStore2Props).getCertificate("kafka")

View File

@ -17,9 +17,10 @@
package other.kafka
import java.io.{File, FileOutputStream, PrintWriter}
import javax.imageio.ImageIO
import java.io.{File, PrintWriter}
import java.nio.file.{Files, StandardOpenOption}
import javax.imageio.ImageIO
import kafka.admin.ReassignPartitionsCommand
import kafka.admin.ReassignPartitionsCommand.Throttle
import org.apache.kafka.common.TopicPartition
@ -310,7 +311,7 @@ object ReplicationQuotasTestRig {
}
def append(message: String): Unit = {
val stream = new FileOutputStream(log, true)
val stream = Files.newOutputStream(log.toPath, StandardOpenOption.APPEND)
new PrintWriter(stream) {
append(message)
close

View File

@ -20,6 +20,7 @@ package kafka
import java.io._
import java.nio._
import java.nio.channels._
import java.nio.file.StandardOpenOption
import java.util.{Properties, Random}
import joptsimple._
@ -188,20 +189,22 @@ object TestLinearWriteSpeed {
}
def close() {
raf.close()
Utils.delete(file)
}
}
class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable {
file.deleteOnExit()
val raf = new RandomAccessFile(file, "rw")
val channel = raf.getChannel
val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
def write(): Int = {
channel.write(content)
content.rewind()
content.limit()
}
def close() {
raf.close()
channel.close()
Utils.delete(file)
}
}

View File

@ -19,6 +19,8 @@ package kafka
import java.io._
import java.nio._
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
/* This code tests the correct function of java's FileChannel.truncate--some platforms don't work. */
object TestTruncate {
@ -26,7 +28,7 @@ object TestTruncate {
def main(args: Array[String]): Unit = {
val name = File.createTempFile("kafka", ".test")
name.deleteOnExit()
val file = new RandomAccessFile(name, "rw").getChannel()
val file = FileChannel.open(name.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
val buffer = ByteBuffer.allocate(12)
buffer.putInt(4).putInt(4).putInt(4)
buffer.rewind()

View File

@ -16,7 +16,8 @@
*/
package kafka
import java.io.{File, FileOutputStream}
import java.io.File
import java.nio.file.Files
import java.util
import kafka.server.KafkaConfig
@ -104,14 +105,13 @@ class KafkaTest {
val file = File.createTempFile("kafkatest", ".properties")
file.deleteOnExit()
val writer = new FileOutputStream(file)
lines.foreach { l =>
writer.write(l.getBytes)
writer.write("\n".getBytes)
}
writer.close
file.getAbsolutePath
val writer = Files.newOutputStream(file.toPath)
try {
lines.foreach { l =>
writer.write(l.getBytes)
writer.write("\n".getBytes)
}
file.getAbsolutePath
} finally writer.close()
}
}

View File

@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.server
package kafka.server
import java.util
import java.util.Properties
import kafka.log.LogConfig
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition

View File

@ -17,7 +17,8 @@
package kafka.tools
import java.io.{FileOutputStream, PrintStream}
import java.io.PrintStream
import java.nio.file.Files
import kafka.common.MessageFormatter
import kafka.tools.ConsoleConsumer.ConsumerWrapper
@ -302,7 +303,7 @@ class ConsoleConsumerTest {
@Test
def shouldParseConfigsFromFile() {
val propsFile = TestUtils.tempFile()
val propsStream = new FileOutputStream(propsFile)
val propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("request.timeout.ms=1000\n".getBytes())
propsStream.write("group.id=group1".getBytes())
propsStream.close()
@ -324,7 +325,7 @@ class ConsoleConsumerTest {
// different in all three places
var propsFile = TestUtils.tempFile()
var propsStream = new FileOutputStream(propsFile)
var propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("group.id=group-from-file".getBytes())
propsStream.close()
var args: Array[String] = Array(
@ -344,7 +345,7 @@ class ConsoleConsumerTest {
// the same in all three places
propsFile = TestUtils.tempFile()
propsStream = new FileOutputStream(propsFile)
propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("group.id=test-group".getBytes())
propsStream.close()
args = Array(
@ -361,7 +362,7 @@ class ConsoleConsumerTest {
// different via --consumer-property and --consumer.config
propsFile = TestUtils.tempFile()
propsStream = new FileOutputStream(propsFile)
propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("group.id=group-from-file".getBytes())
propsStream.close()
args = Array(
@ -395,7 +396,7 @@ class ConsoleConsumerTest {
// different via --group and --consumer.config
propsFile = TestUtils.tempFile()
propsStream = new FileOutputStream(propsFile)
propsStream = Files.newOutputStream(propsFile.toPath)
propsStream.write("group.id=group-from-file".getBytes())
propsStream.close()
args = Array(

View File

@ -21,12 +21,13 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.log._
@ -119,7 +120,8 @@ object TestUtils extends Logging {
/**
* Create a temporary file and return an open file channel for this file
*/
def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
def tempChannel(): FileChannel =
FileChannel.open(tempFile().toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
/**
* Create a kafka server instance with appropriate test settings
@ -879,11 +881,12 @@ object TestUtils extends Logging {
file.close()
}
def appendNonsenseToFile(fileName: File, size: Int) {
val file = new FileOutputStream(fileName, true)
for (_ <- 0 until size)
file.write(random.nextInt(255))
file.close()
def appendNonsenseToFile(file: File, size: Int) {
val outputStream = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND)
try {
for (_ <- 0 until size)
outputStream.write(random.nextInt(255))
} finally outputStream.close()
}
def checkForPhantomInSyncReplicas(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {

View File

@ -18,9 +18,10 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Objects;
/**
@ -68,8 +69,8 @@ public class Printed<K, V> {
throw new TopologyException("filePath can't be an empty string");
}
try {
return new Printed<>(new FileOutputStream(filePath));
} catch (final FileNotFoundException e) {
return new Printed<>(Files.newOutputStream(Paths.get(filePath)));
} catch (final IOException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
}
}

View File

@ -23,14 +23,12 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -123,9 +121,7 @@ public class OffsetCheckpoint {
*/
public Map<TopicPartition, Long> read() throws IOException {
synchronized (lock) {
try (BufferedReader reader
= new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) {
try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
final int version = readInt(reader);
switch (version) {
case 0:
@ -154,7 +150,7 @@ public class OffsetCheckpoint {
default:
throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
}
} catch (final FileNotFoundException e) {
} catch (final NoSuchFileException e) {
return Collections.emptyMap();
}
}

View File

@ -28,11 +28,12 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@ -63,7 +64,7 @@ public class PrintedTest {
final Processor<String, Integer> processor = processorSupplier.get();
processor.process("hi", 1);
processor.close();
try (final FileInputStream stream = new FileInputStream(file)) {
try (final InputStream stream = Files.newInputStream(file.toPath())) {
final byte[] data = new byte[stream.available()];
stream.read(data);
assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n"));
@ -130,4 +131,4 @@ public class PrintedTest {
public void shouldThrowTopologyExceptionIfFilePathDoesntExist() {
Printed.toFile("/this/should/not/exist");
}
}
}

View File

@ -43,8 +43,9 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -658,7 +659,7 @@ public class GlobalStateManagerImplTest {
private void writeCorruptCheckpoint() throws IOException {
final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
try (final OutputStream stream = Files.newOutputStream(checkpointFile.toPath())) {
stream.write("0\n1\nfoo".getBytes());
}
}

View File

@ -25,10 +25,10 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -161,9 +161,9 @@ public class VerifiableLog4jAppender {
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
public static Properties loadProps(String filename) throws IOException {
Properties props = new Properties();
try (InputStream propStream = new FileInputStream(filename)) {
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
return props;

View File

@ -35,10 +35,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -198,9 +198,9 @@ public class VerifiableProducer {
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
public static Properties loadProps(String filename) throws IOException {
Properties props = new Properties();
try (InputStream propStream = new FileInputStream(filename)) {
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
return props;

View File

@ -26,7 +26,6 @@ import org.junit.rules.Timeout;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -41,7 +40,7 @@ public class BasicPlatformTest {
public void testCreateBasicPlatform() throws Exception {
File configFile = TestUtils.tempFile();
try {
try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(configFile),
try (OutputStreamWriter writer = new OutputStreamWriter(Files.newOutputStream(configFile.toPath()),
StandardCharsets.UTF_8)) {
writer.write("{\n");
writer.write(" \"platform\": \"org.apache.kafka.trogdor.basic.BasicPlatform\",\n");