KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server module (#19390)

Rewrite these classes in Java and move them to the server module

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
PoAn Yang 2025-04-09 17:52:56 +08:00 committed by GitHub
parent 5148174196
commit 56591d2d07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 279 additions and 210 deletions

View File

@ -32,9 +32,11 @@ import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.purgatory.DelayedFuturePurgatory
import java.util
import java.util.concurrent.CompletableFuture
import java.util.stream.Collectors
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@ -50,7 +52,7 @@ class AclApis(authHelper: AuthHelper,
config: KafkaConfig) extends Logging {
this.logIdent = "[AclApis-%s-%s] ".format(role, config.nodeId)
private val alterAclsPurgatory =
new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.nodeId)
new DelayedFuturePurgatory("AlterAcls", config.nodeId)
def isClosed: Boolean = alterAclsPurgatory.isShutdown
@ -107,11 +109,11 @@ class AclApis(authHelper: AuthHelper,
}
val future = new CompletableFuture[util.List[AclCreationResult]]()
val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
val createResults = auth.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
val aclCreationResults = allBindings.map { acl =>
val result = errorResults.getOrElse(acl, createResults(validBindings.indexOf(acl)).get)
val result = errorResults.getOrElse(acl, createResults.get(validBindings.indexOf(acl)).get)
val creationResult = new AclCreationResult()
result.exception.toScala.foreach { throwable =>
val apiError = ApiError.fromThrowable(throwable)
@ -123,7 +125,7 @@ class AclApis(authHelper: AuthHelper,
}
future.complete(aclCreationResults.asJava)
}
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, () => sendResponseCallback())
future.thenApply[Unit] { aclCreationResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@ -147,14 +149,15 @@ class AclApis(authHelper: AuthHelper,
val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
.asScala.map(_.toCompletableFuture).toList
.stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
val filterResults = deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
val filterResults: util.List[DeleteAclsFilterResult] = deleteResults.stream().map(_.get)
.map(DeleteAclsResponse.filterResult).collect(Collectors.toList())
future.complete(filterResults)
}
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, () => sendResponseCallback())
future.thenApply[Unit] { filterResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DeleteAclsResponse(

View File

@ -1,107 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.Logging
import java.util
import java.util.concurrent._
import java.util.function.BiConsumer
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.KafkaThread
import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationKey, DelayedOperationPurgatory}
import scala.collection.Seq
/**
* A delayed operation using CompletionFutures that can be created by KafkaApis and watched
* in a DelayedFuturePurgatory purgatory. This is used for ACL updates using async Authorizers.
*/
class DelayedFuture[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit)
extends DelayedOperation(timeoutMs) with Logging {
/**
* The operation can be completed if all the futures have completed successfully
* or failed with exceptions.
*/
override def tryComplete() : Boolean = {
trace(s"Trying to complete operation for ${futures.size} futures")
val pending = futures.count(future => !future.isDone)
if (pending == 0) {
trace("All futures have been completed or have errors, completing the delayed operation")
forceComplete()
} else {
trace(s"$pending future still pending, not completing the delayed operation")
false
}
}
/**
* Timeout any pending futures and invoke responseCallback. This is invoked when all
* futures have completed or the operation has timed out.
*/
override def onComplete(): Unit = {
val pendingFutures = futures.filterNot(_.isDone)
trace(s"Completing operation for ${futures.size} futures, expired ${pendingFutures.size}")
pendingFutures.foreach(_.completeExceptionally(new TimeoutException(s"Request has been timed out after $timeoutMs ms")))
responseCallback.apply()
}
/**
* This is invoked after onComplete(), so no actions required.
*/
override def onExpiration(): Unit = {
}
}
class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
private val purgatory = new DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](),
new ThreadFactory {
override def newThread(r: Runnable): Thread = new KafkaThread(s"DelayedExecutor-$purgatoryName", r, true)
})
private val purgatoryKey = new DelayedOperationKey() {
override def keyLabel(): String = "delayed-future-key"
}
def tryCompleteElseWatch[T](timeoutMs: Long,
futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit): DelayedFuture[T] = {
val delayedFuture = new DelayedFuture[T](timeoutMs, futures, responseCallback)
val done = purgatory.tryCompleteElseWatch(delayedFuture, util.Collections.singletonList(purgatoryKey))
if (!done) {
val callbackAction = new BiConsumer[Void, Throwable]() {
override def accept(result: Void, exception: Throwable): Unit = delayedFuture.forceComplete()
}
CompletableFuture.allOf(futures.toArray: _*).whenCompleteAsync(callbackAction, executor)
}
delayedFuture
}
def shutdown(): Unit = {
executor.shutdownNow()
executor.awaitTermination(60, TimeUnit.SECONDS)
purgatory.shutdown()
}
def isShutdown: Boolean = executor.isShutdown
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.server
import kafka.server.DelayedFuturePurgatory
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters.CollectionHasAsScala
class DelayedFutureTest {
@Test
def testDelayedFuture(): Unit = {
val purgatoryName = "testDelayedFuture"
val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
try {
val result = new AtomicInteger()
def hasExecutorThread: Boolean = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
.exists(_.contains(s"DelayedExecutor-$purgatoryName"))
def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two completed futures: callback should be executed immediately on the same thread
val futures1 = List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () => updateResult(futures1))
assertTrue(r1.isCompleted, "r1 not completed")
assertEquals(21, result.get())
assertFalse(hasExecutorThread, "Unnecessary thread created")
// Two delayed futures: callback should wait for both to complete
result.set(-1)
val futures2 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () => updateResult(futures2))
assertFalse(r2.isCompleted, "r2 should be incomplete")
futures2.head.complete(20)
assertFalse(r2.isCompleted)
assertEquals(-1, result.get())
futures2(1).complete(21)
TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
assertTrue(hasExecutorThread, "Thread not created for executing delayed task")
// One immediate and one delayed future: callback should wait for delayed task to complete
result.set(-1)
val futures3 = List(new CompletableFuture[Integer], CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () => updateResult(futures3))
assertFalse(r3.isCompleted, "r3 should be incomplete")
assertEquals(-1, result.get())
futures3.head.complete(30)
TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
// One future doesn't complete within timeout. Should expire and invoke callback after timeout.
result.set(-1)
val start = Time.SYSTEM.hiResClockMs
val expirationMs = 2000L
val futures4 = List(new CompletableFuture[Integer], new CompletableFuture[Integer])
val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, () => updateResult(futures4))
futures4.head.complete(40)
TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not expired")
assertTrue(r4.isCompleted, "r4 not completed after timeout")
val elapsed = Time.SYSTEM.hiResClockMs - start
assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed should at least $expirationMs")
assertEquals(40, futures4.head.get)
assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
assertThrows(classOf[ExecutionException], () => futures4(1).get).getCause.getClass)
assertEquals(40, result.get())
} finally {
purgatory.shutdown()
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* A delayed operation using CompletionFutures that can be created by KafkaApis and watched
* in a DelayedFuturePurgatory purgatory. This is used for ACL updates using async Authorizers.
*/
public class DelayedFuture<T> extends DelayedOperation {
private final Logger log = new LogContext().logger(DelayedFuture.class.getName());
private final List<CompletableFuture<T>> futures;
private final Runnable responseCallback;
private final long timeoutMs;
public DelayedFuture(long timeoutMs, List<CompletableFuture<T>> futures, Runnable responseCallback) {
super(timeoutMs);
this.timeoutMs = timeoutMs;
this.futures = futures;
this.responseCallback = responseCallback;
}
/**
* The operation can be completed if all the futures have completed successfully
* or failed with exceptions.
*/
@Override
public boolean tryComplete() {
log.trace("Trying to complete operation for {} futures", futures.size());
long pending = futures.stream().filter(future -> !future.isDone()).count();
if (pending == 0) {
log.trace("All futures have been completed or have errors, completing the delayed operation");
return forceComplete();
} else {
log.trace("{} future still pending, not completing the delayed operation", pending);
return false;
}
}
/**
* Timeout any pending futures and invoke responseCallback. This is invoked when all
* futures have completed or the operation has timed out.
*/
@Override
public void onComplete() {
List<CompletableFuture<T>> pendingFutures = futures.stream().filter(future -> !future.isDone()).toList();
log.trace("Completing operation for {} futures, expired {}", futures.size(), pendingFutures.size());
pendingFutures.forEach(future -> future.completeExceptionally(new TimeoutException("Request has been timed out after " + timeoutMs + " ms")));
responseCallback.run();
}
/**
* This is invoked after onComplete(), so no actions required.
*/
@Override
public void onExpiration() {
// This is invoked after onComplete(), so no actions required.
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.utils.KafkaThread;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
public class DelayedFuturePurgatory {
private final DelayedOperationPurgatory<DelayedFuture<?>> purgatory;
private final ThreadPoolExecutor executor;
private final DelayedOperationKey purgatoryKey;
public DelayedFuturePurgatory(String purgatoryName, int brokerId) {
this.purgatory = new DelayedOperationPurgatory<>(purgatoryName, brokerId);
this.executor = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
r -> new KafkaThread("DelayedExecutor-" + purgatoryName, r, true));
this.purgatoryKey = () -> "delayed-future-key";
}
public <T> DelayedFuture<T> tryCompleteElseWatch(
long timeoutMs,
List<CompletableFuture<T>> futures,
Runnable responseCallback
) {
DelayedFuture<T> delayedFuture = new DelayedFuture<>(timeoutMs, futures, responseCallback);
boolean done = purgatory.tryCompleteElseWatch(delayedFuture, List.of(purgatoryKey));
if (!done) {
BiConsumer<Void, Throwable> callbackAction = (result, exception) -> delayedFuture.forceComplete();
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).whenCompleteAsync(callbackAction, executor);
}
return delayedFuture;
}
public void shutdown() throws Exception {
executor.shutdownNow();
executor.awaitTermination(60, TimeUnit.SECONDS);
purgatory.shutdown();
}
public boolean isShutdown() {
return executor.isShutdown();
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DelayedFutureTest {
@Test
void testDelayedFuture() throws Exception {
String purgatoryName = "testDelayedFuture";
DelayedFuturePurgatory purgatory = new DelayedFuturePurgatory(purgatoryName, 0);
try {
AtomicInteger result = new AtomicInteger();
Supplier<Boolean> hasExecutorThread = () -> Thread.getAllStackTraces().keySet().stream()
.map(Thread::getName)
.anyMatch(name -> name.contains("DelayedExecutor-" + purgatoryName));
Function<List<CompletableFuture<Integer>>, Void> updateResult = futures -> {
result.set(futures.stream()
.filter(Predicate.not(CompletableFuture::isCompletedExceptionally))
.mapToInt(future -> assertDoesNotThrow(() -> future.get()))
.sum());
return null;
};
assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
// Two completed futures: callback should be executed immediately on the same thread
List<CompletableFuture<Integer>> futures1 = List.of(
CompletableFuture.completedFuture(10),
CompletableFuture.completedFuture(11)
);
DelayedFuture<Integer> r1 = purgatory.tryCompleteElseWatch(100000L, futures1, () -> updateResult.apply(futures1));
assertTrue(r1.isCompleted(), "r1 not completed");
assertEquals(21, result.get());
assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
// Two delayed futures: callback should wait for both to complete
result.set(-1);
List<CompletableFuture<Integer>> futures2 = List.of(new CompletableFuture<>(), new CompletableFuture<>());
DelayedFuture<Integer> r2 = purgatory.tryCompleteElseWatch(100000L, futures2, () -> updateResult.apply(futures2));
assertFalse(r2.isCompleted(), "r2 should be incomplete");
futures2.get(0).complete(20);
assertFalse(r2.isCompleted());
assertEquals(-1, result.get());
futures2.get(1).complete(21);
TestUtils.waitForCondition(r2::isCompleted, "r2 not completed");
TestUtils.waitForCondition(() -> result.get() == 41, "callback not invoked");
assertTrue(hasExecutorThread.get(), "Thread not created for executing delayed task");
// One immediate and one delayed future: callback should wait for delayed task to complete
result.set(-1);
List<CompletableFuture<Integer>> futures3 = List.of(
new CompletableFuture<>(),
CompletableFuture.completedFuture(31)
);
DelayedFuture<Integer> r3 = purgatory.tryCompleteElseWatch(100000L, futures3, () -> updateResult.apply(futures3));
assertFalse(r3.isCompleted(), "r3 should be incomplete");
assertEquals(-1, result.get());
futures3.get(0).complete(30);
TestUtils.waitForCondition(r3::isCompleted, "r3 not completed");
TestUtils.waitForCondition(() -> result.get() == 61, "callback not invoked");
// One future doesn't complete within timeout. Should expire and invoke callback after timeout.
result.set(-1);
long start = Time.SYSTEM.hiResClockMs();
long expirationMs = 2000L;
List<CompletableFuture<Integer>> futures4 = List.of(new CompletableFuture<>(), new CompletableFuture<>());
DelayedFuture<Integer> r4 = purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> updateResult.apply(futures4));
futures4.get(0).complete(40);
TestUtils.waitForCondition(() -> futures4.get(1).isDone(), "r4 futures not expired");
assertTrue(r4.isCompleted(), "r4 not completed after timeout");
long elapsed = Time.SYSTEM.hiResClockMs() - start;
assertTrue(elapsed >= expirationMs, "Time for expiration " + elapsed + " should at least " + expirationMs);
assertEquals(40, futures4.get(0).get());
Exception exception = assertThrows(ExecutionException.class, () -> futures4.get(1).get());
assertEquals(TimeoutException.class, exception.getCause().getClass());
assertEquals(40, result.get());
} finally {
purgatory.shutdown();
}
}
}