MINOR: Fix some AssignmentsManager bugs (#14954)

- Add proper start & stop for AssignmentsManager's event loop
- Dedupe queued duplicate assignments
- Fix bug where directory ID is resolved too late

Co-authored-by: Gaurav Narula <gaurav_narula2@apple.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Igor Soarez 2023-12-08 23:37:23 +00:00 committed by GitHub
parent 93b6df6173
commit 8c184b4743
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 180 additions and 44 deletions

View File

@ -2325,6 +2325,8 @@ class ReplicaManager(val config: KafkaConfig,
warn(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " + warn(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " +
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
} }
// retrieve the UUID here because logManager.handleLogDirFailure handler removes it
val uuid = logManager.directoryId(dir)
logManager.handleLogDirFailure(dir) logManager.handleLogDirFailure(dir)
if (dir == config.metadataLogDir) { if (dir == config.metadataLogDir) {
fatal(s"Shutdown broker because the metadata log dir $dir has failed") fatal(s"Shutdown broker because the metadata log dir $dir has failed")
@ -2337,7 +2339,6 @@ class ReplicaManager(val config: KafkaConfig,
Exit.halt(1) Exit.halt(1)
} }
if (zkClient.isEmpty) { if (zkClient.isEmpty) {
val uuid = logManager.directoryId(dir)
if (uuid.isDefined) { if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get) directoryEventHandler.handleFailure(uuid.get)
} else { } else {

View File

@ -19,6 +19,10 @@ package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
@ -33,6 +37,7 @@ import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -84,12 +89,13 @@ public class AssignmentsManager {
this.brokerEpochSupplier = brokerEpochSupplier; this.brokerEpochSupplier = brokerEpochSupplier;
this.eventQueue = new KafkaEventQueue(time, this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + brokerId + "]"), new LogContext("[AssignmentsManager id=" + brokerId + "]"),
"broker-" + brokerId + "-directory-assignments-manager-"); "broker-" + brokerId + "-directory-assignments-manager-",
new ShutdownEvent());
channelManager.start();
} }
public void close() throws InterruptedException { public void close() throws InterruptedException {
eventQueue.close(); eventQueue.close();
channelManager.shutdown();
} }
public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, Runnable callback) { public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, Runnable callback) {
@ -119,6 +125,16 @@ public class AssignmentsManager {
} }
} }
/**
* Handles shutdown of the {@link AssignmentsManager}.
*/
private class ShutdownEvent extends Event {
@Override
public void run() throws Exception {
channelManager.shutdown();
}
}
/** /**
* Handles new generated assignments, to be propagated to the controller. * Handles new generated assignments, to be propagated to the controller.
* Assignment events may be handled out of order, so for any two assignment * Assignment events may be handled out of order, so for any two assignment
@ -139,12 +155,19 @@ public class AssignmentsManager {
@Override @Override
public void run() throws Exception { public void run() throws Exception {
AssignmentEvent existing = pending.getOrDefault(partition, null); AssignmentEvent existing = pending.getOrDefault(partition, null);
if (existing != null && existing.timestampNs > timestampNs) { if (existing == null && inflight != null) {
if (log.isDebugEnabled()) { existing = inflight.getOrDefault(partition, null);
log.debug("Dropping assignment {} because it's older than {}", this, existing);
} }
if (existing != null) {
if (existing.dirId.equals(dirId)) {
if (log.isDebugEnabled()) log.debug("Ignoring duplicate assignment {}", this);
return; return;
} }
if (existing.timestampNs > timestampNs) {
if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than {}", this, existing);
return;
}
}
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received new assignment {}", this); log.debug("Received new assignment {}", this);
} }
@ -240,8 +263,8 @@ public class AssignmentsManager {
Set<AssignmentEvent> completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed); Set<AssignmentEvent> completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed);
completed.forEach(assignmentEvent -> assignmentEvent.callback.run()); completed.forEach(assignmentEvent -> assignmentEvent.callback.run());
log.warn("Re-queueing assignments: {}", failed);
if (!failed.isEmpty()) { if (!failed.isEmpty()) {
log.warn("Re-queueing assignments: {}", failed);
for (AssignmentEvent event : failed) { for (AssignmentEvent event : failed) {
pending.put(event.partition, event); pending.put(event.partition, event);
} }
@ -376,4 +399,27 @@ public class AssignmentsManager {
} }
return failures; return failures;
} }
// visible for testing
static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
TopicIdPartition topicPartition = entry.getKey();
Uuid directoryId = entry.getValue();
DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new DirectoryData().setId(directoryId));
TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
TopicData data = new TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
PartitionData partition = new PartitionData().setPartitionIndex(topicPartition.partitionId());
topic.partitions().add(partition);
}
return new AssignReplicasToDirsRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
} }

View File

@ -33,7 +33,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,6 +45,7 @@ import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
import static org.apache.kafka.metadata.AssignmentsHelper.normalize; import static org.apache.kafka.metadata.AssignmentsHelper.normalize;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -71,11 +76,77 @@ public class AssignmentsManagerTest {
manager.close(); manager.close();
} }
AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData request) {
request = request.duplicate();
request.directories().sort(Comparator.comparing(
AssignReplicasToDirsRequestData.DirectoryData::id));
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
directory.topics().sort(Comparator.comparing(
AssignReplicasToDirsRequestData.TopicData::topicId));
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
topic.partitions().sort(Comparator.comparing(
AssignReplicasToDirsRequestData.PartitionData::partitionIndex));
}
}
return request;
}
void assertRequestEquals(AssignReplicasToDirsRequestData expected, AssignReplicasToDirsRequestData actual) {
void assertRequestEquals(
AssignReplicasToDirsRequestData expected,
AssignReplicasToDirsRequestData actual
) {
assertEquals(normalize(expected), normalize(actual)); assertEquals(normalize(expected), normalize(actual));
} }
@Test
void testBuildRequestData() {
Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
}};
AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData(8, 100L, assignment);
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData()
.setBrokerId(8)
.setBrokerEpoch(100L)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_2)
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(2))),
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_2)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(5))))),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_3)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(3))))),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_1)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(4),
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(1)))))));
assertRequestEquals(expected, built);
}
@Test @Test
public void testAssignmentAggregation() throws InterruptedException { public void testAssignmentAggregation() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(1); CountDownLatch readyToAssert = new CountDownLatch(1);
@ -84,7 +155,8 @@ public class AssignmentsManagerTest {
readyToAssert.countDown(); readyToAssert.countDown();
} }
return null; return null;
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); }).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
any(ControllerRequestCompletionHandler.class));
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, () -> { });
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2, () -> { });
@ -96,8 +168,11 @@ public class AssignmentsManagerTest {
manager.wakeup(); manager.wakeup();
} }
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class); ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
verify(channelManager, times(1)).start();
verify(channelManager).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class)); verify(channelManager).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class));
verify(channelManager, atMostOnce()).shutdown();
verifyNoMoreInteractions(channelManager); verifyNoMoreInteractions(channelManager);
assertEquals(1, captor.getAllValues().size()); assertEquals(1, captor.getAllValues().size());
AssignReplicasToDirsRequestData actual = captor.getValue().build().data(); AssignReplicasToDirsRequestData actual = captor.getValue().build().data();
@ -127,27 +202,37 @@ public class AssignmentsManagerTest {
if (readyToAssert.getCount() == 3) { if (readyToAssert.getCount() == 3) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete( invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false, new ClientResponse(null, null, null, 0L, 0L, false, false,
new UnsupportedVersionException("test unsupported version exception"), null, null) new UnsupportedVersionException("test unsupported version exception"), null, null));
);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), () -> { }); // duplicate should be ignored
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3, () -> { });
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), () -> { });
} }
if (readyToAssert.getCount() == 2) { if (readyToAssert.getCount() == 2) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete( invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false, null, new ClientResponse(null, null, null, 0L, 0L, false, false, null,
new AuthenticationException("test authentication exception"), null) new AuthenticationException("test authentication exception"), null)
); );
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"), () -> { });
// duplicate should be ignored
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), () -> { });
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4),
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"), () -> { });
} }
if (readyToAssert.getCount() == 1) { if (readyToAssert.getCount() == 1) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete( invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false, null, null, new ClientResponse(null, null, null, 0L, 0L, false, false, null, null,
new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData() new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code()) .setErrorCode(Errors.NOT_CONTROLLER.code())
.setThrottleTimeMs(0))) .setThrottleTimeMs(0))));
);
} }
return null; return null;
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), any(ControllerRequestCompletionHandler.class)); }).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
any(ControllerRequestCompletionHandler.class));
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, () -> { });
while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) { while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
@ -155,8 +240,12 @@ public class AssignmentsManagerTest {
manager.wakeup(); manager.wakeup();
} }
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class); ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
verify(channelManager, times(5)).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class)); ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
verify(channelManager, times(1)).start();
verify(channelManager, times(5)).sendRequest(captor.capture(),
any(ControllerRequestCompletionHandler.class));
verify(channelManager, atMostOnce()).shutdown();
verifyNoMoreInteractions(channelManager); verifyNoMoreInteractions(channelManager);
assertEquals(5, captor.getAllValues().size()); assertEquals(5, captor.getAllValues().size());
assertRequestEquals(buildRequestData( assertRequestEquals(buildRequestData(