KAFKA-15931: Cancel RemoteLogReader gracefully (#19150)

Backports f24945b519 to 3.9

Instead of reopening the transaction index, it cancels the RemoteFetchTask without interrupting it--avoiding to close the TransactionIndex channel.

This will lead to complete the execution of the remote fetch but ignoring the results. Given that this is considered a rare case, we could live with this. If it becomes a performance issue, it could be optimized.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Jorge Esteban Quilcate Otoya 2025-04-02 00:10:12 +03:00 committed by GitHub
parent 4b86ff9d00
commit c37ac21cc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 3 deletions

View File

@ -84,8 +84,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
}
override def onExpiration(): Unit = {
// cancel the remote storage read task, if it has not been executed yet
val cancelled = remoteFetchTask.cancel(true)
// cancel the remote storage read task, if it has not been executed yet and
// avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index.
val cancelled = remoteFetchTask.cancel(false)
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()

View File

@ -199,7 +199,7 @@ class DelayedRemoteFetchTest {
delayedRemoteFetch.run()
// Check that the task was cancelled and force-completed
verify(remoteFetchTask).cancel(true)
verify(remoteFetchTask).cancel(false)
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented