mirror of https://github.com/apache/kafka.git
KAFKA-18845: Fix flaky QuorumControllerTest#testUncleanShutdownBrokerElrEnabled (#19240)
There're two root causes: 1. When we unclean shutdown `brokerToBeTheLeader`, we didn't wait for the result. That means when we send heartbeat to unfence broker, it has chance to use stale broker epoch to send the request. [0] 2. We use different replica directory to unclean shutdown broker. Even if broker is unfenced, it cannot get an online directory, so the `brokerToBeTheLeader` cannot be elected as a new leader. [1] [0]a5325e029e/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java (L484-L497)
[1]a5325e029e/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java (L2470-L2477)
Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
parent
f24945b519
commit
71875ec58e
|
@ -380,6 +380,9 @@ public class QuorumControllerTest {
|
|||
@Test
|
||||
public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
|
||||
List<Integer> allBrokers = List.of(1, 2, 3);
|
||||
Map<Integer, Uuid> brokerLogDirs = allBrokers.stream().collect(
|
||||
Collectors.toMap(identity(), brokerId -> Uuid.randomUuid())
|
||||
);
|
||||
short replicationFactor = (short) allBrokers.size();
|
||||
long sessionTimeoutMillis = 500;
|
||||
|
||||
|
@ -406,7 +409,7 @@ public class QuorumControllerTest {
|
|||
setClusterId(active.clusterId()).
|
||||
setFeatures(features).
|
||||
setIncarnationId(Uuid.randomUuid()).
|
||||
setLogDirs(List.of(Uuid.randomUuid())).
|
||||
setLogDirs(List.of(brokerLogDirs.get(brokerId))).
|
||||
setListeners(listeners));
|
||||
brokerEpochs.put(brokerId, reply.get().epoch());
|
||||
}
|
||||
|
@ -474,7 +477,7 @@ public class QuorumControllerTest {
|
|||
setClusterId(active.clusterId()).
|
||||
setFeatures(features).
|
||||
setIncarnationId(Uuid.randomUuid()).
|
||||
setLogDirs(List.of(Uuid.randomUuid())).
|
||||
setLogDirs(List.of(brokerLogDirs.get(brokerToUncleanShutdown))).
|
||||
setListeners(listeners));
|
||||
brokerEpochs.put(brokerToUncleanShutdown, reply.get().epoch());
|
||||
partition = active.replicationControl().getPartition(topicIdFoo, 0);
|
||||
|
@ -482,7 +485,7 @@ public class QuorumControllerTest {
|
|||
assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString());
|
||||
|
||||
// Unclean shutdown should not remove the last known ELR members.
|
||||
active.registerBroker(
|
||||
CompletableFuture<BrokerRegistrationReply> replyLeader = active.registerBroker(
|
||||
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
|
||||
new BrokerRegistrationRequestData().
|
||||
setBrokerId(brokerToBeTheLeader).
|
||||
|
@ -490,8 +493,15 @@ public class QuorumControllerTest {
|
|||
setFeatures(features).
|
||||
setIncarnationId(Uuid.randomUuid()).
|
||||
setPreviousBrokerEpoch(brokerEpochs.get(brokerToBeTheLeader)).
|
||||
setLogDirs(List.of(Uuid.randomUuid())).
|
||||
setLogDirs(List.of(brokerLogDirs.get(brokerToBeTheLeader))).
|
||||
setListeners(listeners));
|
||||
brokerEpochs.put(brokerToBeTheLeader, replyLeader.get().epoch());
|
||||
partition = active.replicationControl().getPartition(topicIdFoo, 0);
|
||||
int[] expectedIsr = {brokerToBeTheLeader};
|
||||
assertArrayEquals(expectedIsr, partition.elr, "The ELR for topic partition foo-0 was " + Arrays.toString(partition.elr) +
|
||||
". It is expected to be " + Arrays.toString(expectedIsr));
|
||||
assertArrayEquals(lastKnownElr, partition.lastKnownElr, "The last known ELR for topic partition foo-0 was " + Arrays.toString(partition.lastKnownElr) +
|
||||
". It is expected to be " + Arrays.toString(lastKnownElr));
|
||||
|
||||
// Unfence the last one in the ELR, it should be elected.
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, List.of(brokerToBeTheLeader), brokerEpochs);
|
||||
|
|
Loading…
Reference in New Issue