MINOR: add retry to state dir locking

There is a possibility that the state directory locking fails when another stream thread is taking long to close all tasks. Simple retries should alleviate the problem.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #899 from ymatsuda/minor2
This commit is contained in:
Yasuhiro Matsuda 2016-02-24 13:41:46 -08:00 committed by Guozhang Wang
parent 01aeea7c7b
commit fa05752ccd
1 changed files with 22 additions and 1 deletions

View File

@ -85,7 +85,7 @@ public class ProcessorStateManager {
createStateDirectory(baseDir);
// try to acquire the exclusive lock on the state directory
directoryLock = lockStateDirectory(baseDir);
directoryLock = lockStateDirectory(baseDir, 5);
if (directoryLock == null) {
throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
}
@ -109,8 +109,27 @@ public class ProcessorStateManager {
}
public static FileLock lockStateDirectory(File stateDir) throws IOException {
return lockStateDirectory(stateDir, 0);
}
private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
FileLock lock = lockStateDirectory(channel);
while (lock == null && retry > 0) {
try {
Thread.sleep(200);
} catch (Exception ex) {
// do nothing
}
retry--;
lock = lockStateDirectory(channel);
}
return lock;
}
private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
try {
return channel.tryLock();
} catch (OverlappingFileLockException e) {
@ -118,6 +137,8 @@ public class ProcessorStateManager {
}
}
public File baseDir() {
return this.baseDir;
}