mirror of https://github.com/jenkinsci/jenkins.git
Add a Queue.tryWithLock() method that accepts a timeout. (#11033)
Co-authored-by: Kris Stern <krisstern@outlook.com>
This commit is contained in:
parent
066f4c2fc2
commit
863a8a46b6
|
@ -85,6 +85,7 @@ import java.nio.channels.ClosedByInterruptException;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
|
@ -1348,6 +1349,26 @@ public class Queue extends ResourceController implements Saveable {
|
|||
return queue._tryWithLock(runnable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the supplied {@link Runnable} if the {@link Queue} lock was obtained within the given timeout.
|
||||
*
|
||||
* @param runnable the operation to perform.
|
||||
* @return {@code true} if the lock was acquired within the timeout and the operation was performed.
|
||||
* @since TODO
|
||||
*/
|
||||
public static boolean tryWithLock(Runnable runnable, Duration timeout) throws InterruptedException {
|
||||
final Jenkins jenkins = Jenkins.getInstanceOrNull();
|
||||
// TODO confirm safe to assume non-null and use getInstance()
|
||||
final Queue queue = jenkins == null ? null : jenkins.getQueue();
|
||||
if (queue == null) {
|
||||
runnable.run();
|
||||
return true;
|
||||
} else {
|
||||
return queue._tryWithLock(runnable, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a {@link Runnable} with the {@link Queue} lock held.
|
||||
*
|
||||
|
@ -1435,6 +1456,26 @@ public class Queue extends ResourceController implements Saveable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the supplied {@link Runnable} if the {@link Queue} lock was obtained within the given timeout
|
||||
*
|
||||
* @param runnable the operation to perform.
|
||||
* @return {@code true} if the lock was acquired within the timeout and the operation was performed.
|
||||
* @since TODO
|
||||
*/
|
||||
protected boolean _tryWithLock(Runnable runnable, Duration timeout) throws InterruptedException {
|
||||
if (lock.tryLock(timeout.toNanos(), TimeUnit.NANOSECONDS)) {
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
|
||||
* than locking directly on Queue in order to allow for future refactoring.
|
||||
|
|
|
@ -91,6 +91,7 @@ import java.net.HttpURLConnection;
|
|||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -99,10 +100,15 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
|
@ -451,6 +457,106 @@ public class QueueTest {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void tryWithTimeoutSuccessfullyAcquired() throws InterruptedException, ExecutionException {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
final CountDownLatch task1Started = new CountDownLatch(1);
|
||||
final CountDownLatch task1Release = new CountDownLatch(1);
|
||||
|
||||
Future<Void> task1 = executor.submit(Queue.wrapWithLock(() -> {
|
||||
task1Started.countDown();
|
||||
try {
|
||||
task1Release.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
}));
|
||||
|
||||
// Wait for the first task to be started to ensure it is running and has the lock
|
||||
task1Started.await();
|
||||
|
||||
// Create a task that will need to wait until the first task is complete that will fail to acquire.
|
||||
final AtomicBoolean task2Result = new AtomicBoolean(false);
|
||||
boolean acquired = Queue.tryWithLock(() -> {
|
||||
task2Result.set(true);
|
||||
}, Duration.ofMillis(10));
|
||||
assertFalse(acquired);
|
||||
assertFalse(task2Result.get());
|
||||
|
||||
// Now release the first task and wait (with a long timeout) and we should succeed at getting the lock.
|
||||
final AtomicBoolean task3Result = new AtomicBoolean(false);
|
||||
task1Release.countDown();
|
||||
acquired = Queue.tryWithLock(() -> {
|
||||
task3Result.set(true);
|
||||
}, Duration.ofSeconds(30));
|
||||
|
||||
// First task should complete
|
||||
task1.get();
|
||||
|
||||
// Task 2 should have acquired and completed
|
||||
assertTrue(acquired);
|
||||
assertTrue(task3Result.get());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void tryWithTimeoutFailedToAcquire() throws InterruptedException, ExecutionException {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
// Submit one task that will block indefinitely until released
|
||||
final CountDownLatch task1Started = new CountDownLatch(1);
|
||||
final CountDownLatch task1Release = new CountDownLatch(1);
|
||||
Future<Void> task1 = executor.submit(Queue.wrapWithLock(() -> {
|
||||
task1Started.countDown();
|
||||
try {
|
||||
task1Release.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
}));
|
||||
|
||||
// Wait for task 1 to start
|
||||
task1Started.await();
|
||||
|
||||
// Try to acquire lock with 50ms timeout, expecting that it cannot be acquired
|
||||
final AtomicBoolean task2Complete = new AtomicBoolean(false);
|
||||
boolean result = Queue.tryWithLock(() -> {
|
||||
task2Complete.set(true);
|
||||
}, Duration.ofMillis(50));
|
||||
|
||||
// Results should indicate the task did not run
|
||||
assertFalse(result);
|
||||
assertFalse(task2Complete.get());
|
||||
|
||||
// Now release the first task and wait for it to finish
|
||||
task1Release.countDown();
|
||||
task1.get();
|
||||
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void tryWithTimeoutImmediatelyAcquired() throws InterruptedException {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
final AtomicBoolean taskComplete = new AtomicBoolean(false);
|
||||
boolean result = Queue.tryWithLock(() -> {
|
||||
taskComplete.set(true);
|
||||
}, Duration.ofMillis(1));
|
||||
assertTrue(result);
|
||||
assertTrue(taskComplete.get());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Issue("JENKINS-27256")
|
||||
@Test
|
||||
void inQueueTaskLookupByAPI() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue