mirror of https://github.com/apache/kafka.git
KAFKA-18498: Update lock ownership from main thread (#18732)
Once a StreamThread receives its assignment, it will close the startup tasks. But during the closing process, the StandbyTask.closeClean() method will eventually call theStatemanagerUtil.closeStateManager method which needs to lock the state directory, but locking requires the calling thread be the current owner. Since the main thread grabs the lock on startup but moves on without releasing it, we need to update ownership explicitly here in order for the stream thread to close the startup task and begin processing. Reviewers: Matthias Sax <mjsax@apache.org>, Nick Telford
This commit is contained in:
parent
85109a5111
commit
20b073bbee
|
|
@ -51,7 +51,6 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -107,7 +106,7 @@ public class StateDirectory implements AutoCloseable {
|
|||
private final boolean hasPersistentStores;
|
||||
private final boolean hasNamedTopologies;
|
||||
|
||||
private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
|
||||
private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new ConcurrentHashMap<>();
|
||||
|
||||
private FileChannel stateDirLockChannel;
|
||||
private FileLock stateDirLock;
|
||||
|
|
@ -286,7 +285,7 @@ public class StateDirectory implements AutoCloseable {
|
|||
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
|
||||
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
|
||||
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
|
||||
if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) {
|
||||
if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) {
|
||||
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
|
||||
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
|
||||
drainedTasks.add(entry.getValue());
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.mark import ignore
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.kafka import quorum
|
||||
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
|
||||
|
|
@ -39,7 +38,6 @@ class StreamsEosTest(KafkaTest):
|
|||
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
|
||||
self.test_context = test_context
|
||||
|
||||
@ignore
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_rebalance_simple(self, metadata_quorum):
|
||||
|
|
@ -47,7 +45,6 @@ class StreamsEosTest(KafkaTest):
|
|||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@ignore
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_rebalance_complex(self, metadata_quorum):
|
||||
|
|
@ -82,7 +79,6 @@ class StreamsEosTest(KafkaTest):
|
|||
|
||||
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
@ignore
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_failure_and_recovery(self, metadata_quorum):
|
||||
|
|
@ -90,7 +86,6 @@ class StreamsEosTest(KafkaTest):
|
|||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@ignore
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_failure_and_recovery_complex(self, metadata_quorum):
|
||||
|
|
|
|||
Loading…
Reference in New Issue