mirror of https://github.com/apache/kafka.git
KAFKA-17766: Fixing deadlock in TopicBasedRemoteLogMetadataManager (#17492)
KAFKA-17766: Issue Details: Inside TopicBasedRemoteLogMetadataManager::close, one thread(t1) is calling join on initializationThread thread after taking writeLock on "lock" object => t1 will wait for initializationThread to complete. Internally initializationThread is also using writeLock on "lock" object. This can cause deadlock in below situation initializationThread is started close has been invoked as part of a separate thread. But this thread is not yet scheduled by OS. At line 430, initializationThread is preempted and OS has started running close thread. close takes writeLock and invoked join on initializationThread. Now OS schedules initializationThread again and at line 433 this thread also tries to take writeLock. But since writeLock is already held by close thread => both are waiting on each other to complete. initializationThread will wait on close to release the writeLock, while close thread will wait for completion of initializationThread Fix Details: We can avoid taking lock inside close() method as there no operations with any side effects. closing instance variable is of type AtomicBoolean => no race condition when updating it to true. Co-authored-by: Anshul Goyal <anshul.goyal@broadcom.com> Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
parent
d6b5943570
commit
5cf112dc39
|
@ -448,6 +448,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
log.info("Initialized topic-based RLMM resources successfully");
|
||||
} catch (Exception e) {
|
||||
log.error("Encountered error while initializing producer/consumer", e);
|
||||
initializationFailed = true;
|
||||
return;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
|
@ -568,23 +569,18 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
// Close all the resources.
|
||||
log.info("Closing topic-based RLMM resources");
|
||||
if (closing.compareAndSet(false, true)) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (initializationThread != null) {
|
||||
try {
|
||||
initializationThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Initialization thread was interrupted while waiting to join on close.", e);
|
||||
}
|
||||
if (initializationThread != null) {
|
||||
try {
|
||||
initializationThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Initialization thread was interrupted while waiting to join on close.", e);
|
||||
}
|
||||
|
||||
Utils.closeQuietly(producerManager, "ProducerTask");
|
||||
Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
|
||||
Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore");
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
log.info("Closed topic-based RLMM resources");
|
||||
}
|
||||
|
||||
Utils.closeQuietly(producerManager, "ProducerTask");
|
||||
Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
|
||||
Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore");
|
||||
log.info("Closed topic-based RLMM resources");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue