From c37ac21cc9ad11e0bec301ae9c84a35edd5392c3 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 2 Apr 2025 00:10:12 +0300 Subject: [PATCH] KAFKA-15931: Cancel RemoteLogReader gracefully (#19150) Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 3.9 Instead of reopening the transaction index, it cancels the RemoteFetchTask without interrupting it--avoiding to close the TransactionIndex channel. This will lead to complete the execution of the remote fetch but ignoring the results. Given that this is considered a rare case, we could live with this. If it becomes a performance issue, it could be optimized. Reviewers: Jun Rao --- core/src/main/scala/kafka/server/DelayedRemoteFetch.scala | 5 +++-- .../integration/kafka/server/DelayedRemoteFetchTest.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 58a866aa4a6..f9776fb287d 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -84,8 +84,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], } override def onExpiration(): Unit = { - // cancel the remote storage read task, if it has not been executed yet - val cancelled = remoteFetchTask.cancel(true) + // cancel the remote storage read task, if it has not been executed yet and + // avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index. + val cancelled = remoteFetchTask.cancel(false) if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") DelayedRemoteFetchMetrics.expiredRequestMeter.mark() diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index ea1ffaf0b11..ce758992fea 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -199,7 +199,7 @@ class DelayedRemoteFetchTest { delayedRemoteFetch.run() // Check that the task was cancelled and force-completed - verify(remoteFetchTask).cancel(true) + verify(remoteFetchTask).cancel(false) assertTrue(delayedRemoteFetch.isCompleted) // Check that the ExpiresPerSec metric was incremented