KAFKA-19386: Correcting ExpirationReaper thread names from Purgatory (#19918)

The PR: https://github.com/apache/kafka/pull/17636 migrated
DelayedOperationPurgatory from scala to java, and instatiated
`expirationReaper` at instance level where `purgatoryName` is still
`null` hence all expiration threads from different Purgatories has
incorrect names.

<img width="216" alt="Screenshot 2025-06-07 at 01 28 58"

src="https://github.com/user-attachments/assets/fd1b8137-b290-42e0-9a95-258fde5737d2"
/>

The PR fixes the instatiation of ExpirationReaper, in constructor when
`purgatoryName` is defined.

<img width="296" alt="Screenshot 2025-06-07 at 01 31 27"

src="https://github.com/user-attachments/assets/9912311b-ddf6-4554-8e04-d0b8ad208abc"
/>

This issue affects 4.0 version as well, though minor.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Apoorv Mittal 2025-06-09 12:10:59 +01:00 committed by GitHub
parent d6861f3f15
commit d07aa37412
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 4 additions and 1 deletions

View File

@ -46,7 +46,7 @@ public class DelayedOperationPurgatory<T extends DelayedOperation> {
// the number of estimated total operations in the purgatory
private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0);
/* background thread expiring operations that have timed out */
private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper();
private final ExpiredOperationReaper expirationReaper;
private final String purgatoryName;
private final Timer timeoutTimer;
private final int brokerId;
@ -86,6 +86,9 @@ public class DelayedOperationPurgatory<T extends DelayedOperation> {
this.purgeInterval = purgeInterval;
this.reaperEnabled = reaperEnabled;
this.timerEnabled = timerEnabled;
// The initialization of the expiration reaper thread is done after the brokerId and purgatoryName
// are set, so that the thread name can include these values.
this.expirationReaper = new ExpiredOperationReaper();
watcherLists = new ArrayList<>(SHARDS);
for (int i = 0; i < SHARDS; i++) {