mirror of https://github.com/apache/kafka.git
KAFKA-17310 locking the offline dir can destroy the broker exceptionally (#16856)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
bbdf79e1b4
commit
5d115fea3b
|
@ -37,14 +37,10 @@ public class FileLock {
|
||||||
private final FileChannel channel;
|
private final FileChannel channel;
|
||||||
private java.nio.channels.FileLock flock;
|
private java.nio.channels.FileLock flock;
|
||||||
|
|
||||||
public FileLock(File file) {
|
public FileLock(File file) throws IOException {
|
||||||
this.file = file;
|
this.file = file;
|
||||||
try {
|
|
||||||
this.channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
|
this.channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
|
||||||
StandardOpenOption.WRITE);
|
StandardOpenOption.WRITE);
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public File file() {
|
public File file() {
|
||||||
|
@ -54,19 +50,15 @@ public class FileLock {
|
||||||
/**
|
/**
|
||||||
* Lock the file or throw an exception if the lock is already held
|
* Lock the file or throw an exception if the lock is already held
|
||||||
*/
|
*/
|
||||||
public synchronized void lock() {
|
public synchronized void lock() throws IOException {
|
||||||
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
|
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
|
||||||
try {
|
|
||||||
flock = channel.lock();
|
flock = channel.lock();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to lock the file and return true if the locking succeeds
|
* Try to lock the file and return true if the locking succeeds
|
||||||
*/
|
*/
|
||||||
public synchronized boolean tryLock() {
|
public synchronized boolean tryLock() throws IOException {
|
||||||
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
|
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
|
||||||
try {
|
try {
|
||||||
// weirdly this method will return null if the lock is held by another
|
// weirdly this method will return null if the lock is held by another
|
||||||
|
@ -76,37 +68,27 @@ public class FileLock {
|
||||||
return flock != null;
|
return flock != null;
|
||||||
} catch (OverlappingFileLockException e) {
|
} catch (OverlappingFileLockException e) {
|
||||||
return false;
|
return false;
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unlock the lock if it is held
|
* Unlock the lock if it is held
|
||||||
*/
|
*/
|
||||||
public synchronized void unlock() {
|
public synchronized void unlock() throws IOException {
|
||||||
LOGGER.trace("Releasing lock on {}", file.getAbsolutePath());
|
LOGGER.trace("Releasing lock on {}", file.getAbsolutePath());
|
||||||
if (flock != null) {
|
if (flock != null) {
|
||||||
try {
|
|
||||||
flock.release();
|
flock.release();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy this lock, closing the associated FileChannel
|
* Destroy this lock, closing the associated FileChannel
|
||||||
*/
|
*/
|
||||||
public synchronized void destroy() {
|
public synchronized void destroy() throws IOException {
|
||||||
unlock();
|
unlock();
|
||||||
try {
|
|
||||||
if (file.exists() && file.delete()) {
|
if (file.exists() && file.delete()) {
|
||||||
LOGGER.trace("Deleted {}", file.getAbsolutePath());
|
LOGGER.trace("Deleted {}", file.getAbsolutePath());
|
||||||
}
|
}
|
||||||
channel.close();
|
channel.close();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -107,7 +108,7 @@ public final class MetadataShell {
|
||||||
* this hole would require the parent directory to always be writable when loading a
|
* this hole would require the parent directory to always be writable when loading a
|
||||||
* snapshot so that we could create our .lock file there.
|
* snapshot so that we could create our .lock file there.
|
||||||
*/
|
*/
|
||||||
static FileLock takeDirectoryLockIfExists(File directory) {
|
static FileLock takeDirectoryLockIfExists(File directory) throws IOException {
|
||||||
if (new File(directory, ".lock").exists()) {
|
if (new File(directory, ".lock").exists()) {
|
||||||
return takeDirectoryLock(directory);
|
return takeDirectoryLock(directory);
|
||||||
} else {
|
} else {
|
||||||
|
@ -118,7 +119,7 @@ public final class MetadataShell {
|
||||||
/**
|
/**
|
||||||
* Take the FileLock in the given directory.
|
* Take the FileLock in the given directory.
|
||||||
*/
|
*/
|
||||||
static FileLock takeDirectoryLock(File directory) {
|
static FileLock takeDirectoryLock(File directory) throws IOException {
|
||||||
FileLock fileLock = new FileLock(new File(directory, ".lock"));
|
FileLock fileLock = new FileLock(new File(directory, ".lock"));
|
||||||
try {
|
try {
|
||||||
if (!fileLock.tryLock()) {
|
if (!fileLock.tryLock()) {
|
||||||
|
|
Loading…
Reference in New Issue