Merge branch '2.5.x'

Closes gh-27471
This commit is contained in:
Andy Wilkinson 2021-07-23 11:02:43 +01:00
commit dd7dd58f41
1 changed files with 47 additions and 53 deletions

View File

@ -44,11 +44,11 @@ import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
@ -1174,8 +1174,8 @@ public abstract class AbstractServletWebServerFactoryTests {
AtomicReference<GracefulShutdownResult> result = new AtomicReference<>(); AtomicReference<GracefulShutdownResult> result = new AtomicReference<>();
this.webServer.shutDownGracefully(result::set); this.webServer.shutDownGracefully(result::set);
this.webServer.stop(); this.webServer.stop();
Awaitility.await().atMost(Duration.ofSeconds(30)) assertThat(Awaitility.await().atMost(Duration.ofSeconds(30)).until(result::get, Objects::nonNull))
.until(() -> GracefulShutdownResult.REQUESTS_ACTIVE == result.get()); .isEqualTo(GracefulShutdownResult.REQUESTS_ACTIVE);
try { try {
blockingServlet.admitOne(); blockingServlet.admitOne();
} }
@ -1524,7 +1524,7 @@ public abstract class AbstractServletWebServerFactoryTests {
protected static class BlockingServlet extends HttpServlet { protected static class BlockingServlet extends HttpServlet {
private final BlockingQueue<CyclicBarrier> barriers = new ArrayBlockingQueue<>(10); private final BlockingQueue<Blocker> blockers = new ArrayBlockingQueue<>(10);
public BlockingServlet() { public BlockingServlet() {
@ -1532,42 +1532,23 @@ public abstract class AbstractServletWebServerFactoryTests {
@Override @Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
CyclicBarrier barrier = new CyclicBarrier(2); Blocker blocker = new Blocker();
this.barriers.add(barrier); this.blockers.add(blocker);
try { blocker.await();
barrier.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new ServletException(ex);
}
} }
public void admitOne() { public void admitOne() throws InterruptedException {
try { this.blockers.take().clear();
CyclicBarrier barrier = this.barriers.take();
if (!barrier.isBroken()) {
barrier.await();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
} }
public void awaitQueue() throws InterruptedException { public void awaitQueue() throws InterruptedException {
while (this.barriers.isEmpty()) { while (this.blockers.isEmpty()) {
Thread.sleep(100); Thread.sleep(100);
} }
} }
public void awaitQueue(int size) throws InterruptedException { public void awaitQueue(int size) throws InterruptedException {
while (this.barriers.size() < size) { while (this.blockers.size() < size) {
Thread.sleep(100); Thread.sleep(100);
} }
} }
@ -1576,45 +1557,58 @@ public abstract class AbstractServletWebServerFactoryTests {
static class BlockingAsyncServlet extends HttpServlet { static class BlockingAsyncServlet extends HttpServlet {
private final BlockingQueue<CyclicBarrier> barriers = new ArrayBlockingQueue<>(10); private final BlockingQueue<Blocker> blockers = new ArrayBlockingQueue<>(10);
@Override @Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
CyclicBarrier barrier = new CyclicBarrier(2); Blocker blocker = new Blocker();
this.barriers.add(barrier); this.blockers.add(blocker);
AsyncContext async = req.startAsync(); AsyncContext async = req.startAsync();
new Thread(() -> { new Thread(() -> {
try { blocker.await();
barrier.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
}
async.complete(); async.complete();
}).start(); }).start();
} }
private void admitOne() { private void admitOne() throws InterruptedException {
try { this.blockers.take().clear();
this.barriers.take().await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
} }
private void awaitQueue() throws InterruptedException { private void awaitQueue() throws InterruptedException {
while (this.barriers.isEmpty()) { while (this.blockers.isEmpty()) {
Thread.sleep(100); Thread.sleep(100);
} }
} }
} }
private static final class Blocker {
private boolean block = true;
private final Object monitor = new Object();
private void await() {
synchronized (this.monitor) {
while (this.block) {
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
System.out.println("Interrupted!");
// Keep waiting
}
}
}
}
private void clear() {
synchronized (this.monitor) {
this.block = false;
this.monitor.notifyAll();
}
}
}
} }