KAFKA-15357: Aggregate and propagate assignments

A new AssignmentsManager accumulates, batches, and sends KIP-858
assignment events to the Controller. Assignments are sent via
AssignReplicasToDirs requests.

Move QuorumTestHarness.formatDirectories into TestUtils so it can be
used in other test contexts.

Fix a bug in ControllerRegistration.java where the wrong version of the
record was being generated in ControllerRegistration.toRecord.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>
This commit is contained in:
Igor Soarez 2023-09-11 22:44:40 +01:00 committed by Colin P. McCabe
parent b1d83e2b04
commit a03a71d7b5
15 changed files with 839 additions and 38 deletions

View File

@ -36,6 +36,7 @@
<allow pkg="kafka.utils" /> <allow pkg="kafka.utils" />
<allow pkg="kafka.serializer" /> <allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry <!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable --> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->

View File

@ -27,6 +27,13 @@ import java.nio.ByteBuffer;
public class AssignReplicasToDirsRequest extends AbstractRequest { public class AssignReplicasToDirsRequest extends AbstractRequest {
/**
* The maximum number of assignments to be included in a single request.
* This limit was chosen based on the maximum size of AssignReplicasToDirsRequest for
* 10 different directory IDs, so that it still fits in a single TCP packet. i.e. 64KB.
*/
public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250;
public static class Builder extends AbstractRequest.Builder<AssignReplicasToDirsRequest> { public static class Builder extends AbstractRequest.Builder<AssignReplicasToDirsRequest> {
private final AssignReplicasToDirsRequestData data; private final AssignReplicasToDirsRequestData data;

View File

@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class AssignmentsManager {
private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class);
/**
* Assignments are dispatched to the controller this long after
* being submitted to {@link AssignmentsManager}, if there
* is no request in flight already.
* The interval is reset when a new assignment is submitted.
* If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
* is reached, we ignore this interval and dispatch immediately.
*/
private static final long DISPATCH_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10);
private final Time time;
private final NodeToControllerChannelManager channelManager;
private final int brokerId;
private final Supplier<Long> brokerEpochSupplier;
private final KafkaEventQueue eventQueue;
// These variables should only be mutated from the KafkaEventQueue thread
private Map<TopicIdPartition, AssignmentEvent> inflight = null;
private Map<TopicIdPartition, AssignmentEvent> pending = new HashMap<>();
private final ExponentialBackoff resendExponentialBackoff =
new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
private int failedAttempts = 0;
public AssignmentsManager(Time time,
NodeToControllerChannelManager channelManager,
int brokerId,
Supplier<Long> brokerEpochSupplier) {
this.time = time;
this.channelManager = channelManager;
this.brokerId = brokerId;
this.brokerEpochSupplier = brokerEpochSupplier;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + brokerId + "]"),
"broker-" + brokerId + "-directory-assignments-manager-");
}
public void close() throws InterruptedException {
eventQueue.close();
channelManager.shutdown();
}
public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId));
}
// only for testing
void wakeup() {
eventQueue.wakeup();
}
/**
* Base class for all the events handled by {@link AssignmentsManager}.
*/
private abstract static class Event implements EventQueue.Event {
/**
* Override the default behavior in
* {@link EventQueue.Event#handleException}
* which swallows the exception.
*/
@Override
public void handleException(Throwable e) {
log.error("Unexpected error handling {}", this, e);
}
}
/**
* Handles new generated assignments, to be propagated to the controller.
* Assignment events may be handled out of order, so for any two assignment
* events for the same topic partition, the one with the oldest timestamp is
* disregarded.
*/
private class AssignmentEvent extends Event {
final long timestampNs;
final TopicIdPartition partition;
final Uuid dirId;
AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid dirId) {
this.timestampNs = timestampNs;
this.partition = partition;
this.dirId = dirId;
}
@Override
public void run() throws Exception {
AssignmentEvent existing = pending.getOrDefault(partition, null);
if (existing != null && existing.timestampNs > timestampNs) {
if (log.isDebugEnabled()) {
log.debug("Dropping assignment {} because it's older than {}", this, existing);
}
return;
}
if (log.isDebugEnabled()) {
log.debug("Received new assignment {}", this);
}
pending.put(partition, this);
if (inflight == null || inflight.isEmpty()) {
scheduleDispatch();
}
}
@Override
public String toString() {
return "AssignmentEvent{" +
"timestampNs=" + timestampNs +
", partition=" + partition +
", dirId=" + dirId +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentEvent that = (AssignmentEvent) o;
return timestampNs == that.timestampNs &&
Objects.equals(partition, that.partition) &&
Objects.equals(dirId, that.dirId);
}
@Override
public int hashCode() {
return Objects.hash(timestampNs, partition, dirId);
}
}
/**
* Gathers pending assignments and pushes them to the controller in a {@link AssignReplicasToDirsRequest}.
*/
private class DispatchEvent extends Event {
static final String TAG = "dispatch";
@Override
public void run() throws Exception {
if (inflight != null) {
throw new IllegalStateException("Bug. Should not be dispatching while there are assignments in flight");
}
if (pending.isEmpty()) {
log.trace("No pending assignments, no-op dispatch");
return;
}
Collection<AssignmentEvent> events = pending.values();
pending = new HashMap<>();
inflight = new HashMap<>();
for (AssignmentEvent event : events) {
if (inflight.size() < AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
inflight.put(event.partition, event);
} else {
pending.put(event.partition, event);
}
}
if (!pending.isEmpty()) {
log.warn("Too many assignments ({}) to fit in one call, sending only {} and queueing the rest",
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
}
Map<TopicIdPartition, Uuid> assignment = inflight.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().dirId));
if (log.isDebugEnabled()) {
log.debug("Dispatching {} assignments: {}", assignment.size(), assignment);
}
channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
buildRequestData(brokerId, brokerEpochSupplier.get(), assignment)),
new AssignReplicasToDirsRequestCompletionHandler());
}
}
/**
* Handles the response to a dispatched {@link AssignReplicasToDirsRequest}.
*/
private class AssignmentResponseEvent extends Event {
private final ClientResponse response;
public AssignmentResponseEvent(ClientResponse response) {
this.response = response;
}
@Override
public void run() throws Exception {
if (inflight == null) {
throw new IllegalStateException("Bug. Cannot not be handling a client response if there is are no assignments in flight");
}
if (responseIsError(response)) {
requeueAllAfterFailure();
} else {
failedAttempts = 0;
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data();
Set<AssignmentEvent> failed = filterFailures(data, inflight);
log.warn("Re-queueing assignments: {}", failed);
if (!failed.isEmpty()) {
for (AssignmentEvent event : failed) {
pending.put(event.partition, event);
}
}
inflight = null;
if (!pending.isEmpty()) {
scheduleDispatch();
}
}
}
}
/**
* Callback for a {@link AssignReplicasToDirsRequest}.
*/
private class AssignReplicasToDirsRequestCompletionHandler extends ControllerRequestCompletionHandler {
@Override
public void onTimeout() {
log.warn("Request to controller timed out");
appendResponseEvent(null);
}
@Override
public void onComplete(ClientResponse response) {
if (log.isDebugEnabled()) {
log.debug("Received controller response: {}", response);
}
appendResponseEvent(response);
}
void appendResponseEvent(ClientResponse response) {
eventQueue.prepend(new AssignmentResponseEvent(response));
}
}
private void scheduleDispatch() {
if (pending.size() < AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
scheduleDispatch(DISPATCH_INTERVAL_NS);
} else {
log.debug("Too many pending assignments, dispatching immediately");
eventQueue.enqueue(EventQueue.EventInsertionType.APPEND, DispatchEvent.TAG + "-immediate",
new EventQueue.NoDeadlineFunction(), new DispatchEvent());
}
}
private void scheduleDispatch(long delayNs) {
if (log.isTraceEnabled()) {
log.debug("Scheduling dispatch in {}ns", delayNs);
}
eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, DispatchEvent.TAG,
new EventQueue.LatestDeadlineFunction(time.nanoseconds() + delayNs), new DispatchEvent());
}
private void requeueAllAfterFailure() {
if (inflight != null) {
log.debug("Re-queueing all in-flight assignments after failure");
for (AssignmentEvent event : inflight.values()) {
pending.put(event.partition, event);
}
inflight = null;
++failedAttempts;
long backoffNs = TimeUnit.MILLISECONDS.toNanos(resendExponentialBackoff.backoff(failedAttempts));
scheduleDispatch(DISPATCH_INTERVAL_NS + backoffNs);
}
}
private static boolean responseIsError(ClientResponse response) {
if (response == null) {
log.debug("Response is null");
return true;
}
if (response.authenticationException() != null) {
log.error("Failed to propagate directory assignments because authentication failed", response.authenticationException());
return true;
}
if (response.versionMismatch() != null) {
log.error("Failed to propagate directory assignments because the request version is unsupported", response.versionMismatch());
return true;
}
if (response.wasDisconnected()) {
log.error("Failed to propagate directory assignments because the connection to the controller was disconnected");
return true;
}
if (response.wasTimedOut()) {
log.error("Failed to propagate directory assignments because the request timed out");
return true;
}
if (response.responseBody() == null) {
log.error("Failed to propagate directory assignments because the Controller returned an empty response");
return true;
}
if (!(response.responseBody() instanceof AssignReplicasToDirsResponse)) {
log.error("Failed to propagate directory assignments because the Controller returned an invalid response type");
return true;
}
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data();
Errors error = Errors.forCode(data.errorCode());
if (error != Errors.NONE) {
log.error("Failed to propagate directory assignments because the Controller returned error {}", error.name());
return true;
}
return false;
}
private static Set<AssignmentEvent> filterFailures(
AssignReplicasToDirsResponseData data,
Map<TopicIdPartition, AssignmentEvent> sent) {
Set<AssignmentEvent> failures = new HashSet<>();
Set<TopicIdPartition> acknowledged = new HashSet<>();
for (AssignReplicasToDirsResponseData.DirectoryData directory : data.directories()) {
for (AssignReplicasToDirsResponseData.TopicData topic : directory.topics()) {
for (AssignReplicasToDirsResponseData.PartitionData partition : topic.partitions()) {
TopicIdPartition topicPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex());
AssignmentEvent event = sent.get(topicPartition);
if (event == null) {
log.error("AssignReplicasToDirsResponse contains unexpected partition {} into directory {}", partition, directory.id());
} else {
acknowledged.add(topicPartition);
Errors error = Errors.forCode(partition.errorCode());
if (error != Errors.NONE) {
log.error("Controller returned error {} for assignment of partition {} into directory {}",
error.name(), partition, event.dirId);
failures.add(event);
}
}
}
}
}
for (AssignmentEvent event : sent.values()) {
if (!acknowledged.contains(event.partition)) {
log.error("AssignReplicasToDirsResponse is missing assignment of partition {} into directory {}", event.partition, event.dirId);
failures.add(event);
}
}
return failures;
}
// visible for testing
static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
TopicIdPartition topicPartition = entry.getKey();
Uuid directoryId = entry.getValue();
DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new DirectoryData().setId(directoryId));
TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
TopicData data = new TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
PartitionData partition = new PartitionData().setPartitionIndex(topicPartition.partitionId());
topic.partitions().add(partition);
}
return new AssignReplicasToDirsRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
}

View File

@ -35,6 +35,7 @@ import kafka.log.remote.RemoteLogManager;
import kafka.zk.KafkaZkClient; import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;
import scala.compat.java8.OptionConverters; import scala.compat.java8.OptionConverters;
@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
private Optional<String> threadNamePrefix = Optional.empty(); private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L; private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty(); private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
private DirectoryEventHandler directoryEventHandler = DirectoryEventHandler.NOOP;
public ReplicaManagerBuilder setConfig(KafkaConfig config) { public ReplicaManagerBuilder setConfig(KafkaConfig config) {
this.config = config; this.config = config;
@ -172,6 +174,11 @@ public class ReplicaManagerBuilder {
return this; return this;
} }
public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
this.directoryEventHandler = directoryEventHandler;
return this;
}
public ReplicaManager build() { public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Collections.emptyMap()); if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (metrics == null) metrics = new Metrics(); if (metrics == null) metrics = new Metrics();
@ -200,6 +207,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedRemoteFetchPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(threadNamePrefix), OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch, () -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager)); OptionConverters.toScala(addPartitionsToTxnManager),
directoryEventHandler);
} }
} }

View File

@ -85,6 +85,7 @@ trait AlterPartitionListener {
def markIsrExpand(): Unit def markIsrExpand(): Unit
def markIsrShrink(): Unit def markIsrShrink(): Unit
def markFailed(): Unit def markFailed(): Unit
def assignDir(dir: String): Unit
} }
class DelayedOperations(topicPartition: TopicPartition, class DelayedOperations(topicPartition: TopicPartition,
@ -119,6 +120,10 @@ object Partition {
} }
override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark() override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
override def assignDir(dir: String): Unit = {
replicaManager.maybeNotifyPartitionAssignedToDirectory(topicPartition, dir)
}
} }
val delayedOperations = new DelayedOperations( val delayedOperations = new DelayedOperations(
@ -480,6 +485,7 @@ class Partition(val topicPartition: TopicPartition,
if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener) if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener)
maybeLog = Some(log) maybeLog = Some(log)
updateHighWatermark(log) updateHighWatermark(log)
alterPartitionListener.assignDir(log.parentDir)
log log
} finally { } finally {
logManager.finishedInitializingLog(topicPartition, maybeLog) logManager.finishedInitializingLog(topicPartition, maybeLog)

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition} import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
import org.apache.kafka.coordinator.group.util.SystemTimerReaper import org.apache.kafka.coordinator.group.util.SystemTimerReaper
@ -43,7 +43,7 @@ import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange} import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
@ -85,6 +85,8 @@ class BrokerServer(
@volatile var lifecycleManager: BrokerLifecycleManager = _ @volatile var lifecycleManager: BrokerLifecycleManager = _
var assignmentsManager: AssignmentsManager = _
private val isShuttingDown = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false)
val lock = new ReentrantLock() val lock = new ReentrantLock()
@ -277,6 +279,28 @@ class BrokerServer(
time time
) )
val assignmentsChannelManager = NodeToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
"directory-assignments",
s"broker-${config.nodeId}-",
retryTimeoutMs = 60000
)
assignmentsManager = new AssignmentsManager(
time,
assignmentsChannelManager,
config.brokerId,
() => lifecycleManager.brokerEpoch
)
val directoryEventHandler = new DirectoryEventHandler {
override def handleAssignment(partition: TopicIdPartition, directoryId: Uuid): Unit =
assignmentsManager.onAssignment(partition, directoryId)
override def handleFailure(directoryId: Uuid): Unit =
lifecycleManager.propagateDirectoryFailure(directoryId)
}
this._replicaManager = new ReplicaManager( this._replicaManager = new ReplicaManager(
config = config, config = config,
metrics = metrics, metrics = metrics,
@ -294,7 +318,8 @@ class BrokerServer(
threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names. threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
delayedRemoteFetchPurgatoryParam = None, delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch, brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager) addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler
) )
/* start token manager */ /* start token manager */
@ -648,6 +673,9 @@ class BrokerServer(
if (tokenManager != null) if (tokenManager != null)
CoreUtils.swallow(tokenManager.shutdown(), this) CoreUtils.swallow(tokenManager.shutdown(), this)
if (assignmentsManager != null)
CoreUtils.swallow(assignmentsManager.close(), this)
if (replicaManager != null) if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this) CoreUtils.swallow(replicaManager.shutdown(), this)

View File

@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName} import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName}
import kafka.server.ReplicaManager.createLogReadResult import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
@ -55,6 +55,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common
import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@ -269,7 +271,8 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
threadNamePrefix: Option[String] = None, threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1, val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP
) extends Logging { ) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass) private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@ -2292,9 +2295,9 @@ class ReplicaManager(val config: KafkaConfig,
* The log directory failure handler for the replica * The log directory failure handler for the replica
* *
* @param dir the absolute path of the log directory * @param dir the absolute path of the log directory
* @param sendZkNotification check if we need to send notification to zookeeper node (needed for unit test) * @param notifyController check if we need to send notification to the Controller (needed for unit test)
*/ */
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = { def handleLogDirFailure(dir: String, notifyController: Boolean = true): Unit = {
if (!logManager.isLogDirOnline(dir)) if (!logManager.isLogDirOnline(dir))
return return
warn(s"Stopping serving replicas in dir $dir") warn(s"Stopping serving replicas in dir $dir")
@ -2323,16 +2326,57 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
} }
logManager.handleLogDirFailure(dir) logManager.handleLogDirFailure(dir)
if (dir == config.metadataLogDir) {
fatal(s"Shutdown broker because the metadata log dir $dir has failed")
Exit.halt(1)
}
if (sendZkNotification) if (notifyController) {
if (config.migrationEnabled) {
fatal(s"Shutdown broker because some log directory has failed during migration mode: $dir")
Exit.halt(1)
}
if (zkClient.isEmpty) { if (zkClient.isEmpty) {
warn("Unable to propagate log dir failure via Zookeeper in KRaft mode") val uuid = logManager.directoryId(dir)
if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get)
} else {
fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID")
Exit.halt(1)
}
} else { } else {
zkClient.get.propagateLogDirEvent(localBrokerId) zkClient.get.propagateLogDirEvent(localBrokerId)
} }
}
warn(s"Stopped serving replicas in dir $dir") warn(s"Stopped serving replicas in dir $dir")
} }
/**
* Called when a topic partition is placed in a log directory.
* If a directory event listener is configured,
* and if the selected log directory has an assigned Uuid,
* then the listener is notified of the assignment.
*/
def maybeNotifyPartitionAssignedToDirectory(tp: TopicPartition, dir: String): Unit = {
if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
logManager.directoryId(dir) match {
case None => throw new IllegalStateException(s"Assignment into unidentified directory: ${dir}")
case Some(dirId) =>
getPartition(tp) match {
case HostedPartition.Offline | HostedPartition.None =>
throw new IllegalStateException("Assignment for a partition that is not online")
case HostedPartition.Online(partition) => partition.topicId match {
case None =>
throw new IllegalStateException(s"Assignment for topic without ID: ${tp.topic()}")
case Some(topicId) =>
val topicIdPartition = new common.TopicIdPartition(topicId, tp.partition())
directoryEventHandler.handleAssignment(topicIdPartition, dirId)
}
}
}
}
}
def removeMetrics(): Unit = { def removeMetrics(): Unit = {
ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric) ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
} }

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class AssignmentsManagerTest {
private static final Uuid TOPIC_1 = Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
private static final Uuid TOPIC_2 = Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
private static final Uuid DIR_1 = Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
private static final Uuid DIR_2 = Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
private static final Uuid DIR_3 = Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
private MockTime time;
private NodeToControllerChannelManager channelManager;
private AssignmentsManager manager;
@BeforeEach
public void setup() {
time = new MockTime();
channelManager = mock(NodeToControllerChannelManager.class);
manager = new AssignmentsManager(time, channelManager, 8, () -> 100L);
}
@AfterEach
void tearDown() throws InterruptedException {
manager.close();
}
@Test
void testBuildRequestData() {
Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
}};
AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData(8, 100L, assignment);
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData()
.setBrokerId(8)
.setBrokerEpoch(100L)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_2)
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(2)
)),
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_2)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(5)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_3)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(3)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_1)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(4),
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(1)
))
))
));
assertEquals(expected, built);
}
@Test
public void testAssignmentAggregation() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(1);
doAnswer(invocation -> {
if (readyToAssert.getCount() > 0) {
readyToAssert.countDown();
}
return null;
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), any(ControllerRequestCompletionHandler.class));
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1);
manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2);
while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
time.sleep(100);
manager.wakeup();
}
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
verify(channelManager).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class));
verifyNoMoreInteractions(channelManager);
assertEquals(1, captor.getAllValues().size());
AssignReplicasToDirsRequestData actual = captor.getValue().build().data();
AssignReplicasToDirsRequestData expected = AssignmentsManager.buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
}}
);
assertEquals(expected, actual);
}
@Test
void testRequeuesFailedAssignmentPropagations() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(5);
doAnswer(invocation -> {
if (readyToAssert.getCount() > 0) {
readyToAssert.countDown();
}
if (readyToAssert.getCount() == 4) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onTimeout();
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3);
}
if (readyToAssert.getCount() == 3) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false,
new UnsupportedVersionException("test unsupported version exception"), null, null)
);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
}
if (readyToAssert.getCount() == 2) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false, null,
new AuthenticationException("test authentication exception"), null)
);
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
}
if (readyToAssert.getCount() == 1) {
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
new ClientResponse(null, null, null, 0L, 0L, false, false, null, null,
new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code())
.setThrottleTimeMs(0)))
);
}
return null;
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), any(ControllerRequestCompletionHandler.class));
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
time.sleep(TimeUnit.SECONDS.toMillis(1));
manager.wakeup();
}
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
verify(channelManager, times(5)).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class));
verifyNoMoreInteractions(channelManager);
assertEquals(5, captor.getAllValues().size());
assertEquals(AssignmentsManager.buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
}}
), captor.getAllValues().get(0).build().data());
assertEquals(AssignmentsManager.buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
}}
), captor.getAllValues().get(1).build().data());
assertEquals(AssignmentsManager.buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
put(new TopicIdPartition(TOPIC_1, 3), Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
put(new TopicIdPartition(TOPIC_1, 4), Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
}}
), captor.getAllValues().get(4).build().data());
}
}

View File

@ -17,17 +17,16 @@
package kafka.server package kafka.server
import java.io.{ByteArrayOutputStream, File, PrintStream} import java.io.File
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util import java.util
import java.util.{Collections, Optional, OptionalInt, Properties} import java.util.{Collections, Optional, OptionalInt, Properties}
import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration import javax.security.auth.login.Configuration
import kafka.tools.StorageTool
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils} import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient} import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.Uuid import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.common.utils.{Exit, Time}
@ -110,6 +109,7 @@ class KRaftQuorumImplementation(
setVersion(MetaPropertiesVersion.V1). setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId). setClusterId(clusterId).
setNodeId(config.nodeId). setNodeId(config.nodeId).
setDirectoryId(DirectoryId.random()).
build()) build())
}) })
copier.setPreWriteHandler((logDir, _, _) => { copier.setPreWriteHandler((logDir, _, _) => {
@ -300,25 +300,6 @@ abstract class QuorumTestHarness extends Logging {
def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
private def formatDirectories(directories: immutable.Seq[String],
metaProperties: MetaProperties): Unit = {
val stream = new ByteArrayOutputStream()
var out: PrintStream = null
try {
out = new PrintStream(stream)
val bootstrapMetadata = StorageTool.buildBootstrapMetadata(metadataVersion,
optionalMetadataRecords, "format command")
if (StorageTool.formatCommand(out, directories, metaProperties, bootstrapMetadata, metadataVersion,
ignoreFormatted = false) != 0) {
throw new RuntimeException(stream.toString())
}
debug(s"Formatted storage directory(ies) ${directories}")
} finally {
if (out != null) out.close()
stream.close()
}
}
private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = { private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
val propsList = kraftControllerConfigs() val propsList = kraftControllerConfigs()
if (propsList.size != 1) { if (propsList.size != 1) {
@ -337,7 +318,7 @@ abstract class QuorumTestHarness extends Logging {
setClusterId(Uuid.randomUuid().toString). setClusterId(Uuid.randomUuid().toString).
setNodeId(nodeId). setNodeId(nodeId).
build() build()
formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), metaProperties) TestUtils.formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), metaProperties, metadataVersion, optionalMetadataRecords)
val metadataRecords = new util.ArrayList[ApiMessageAndVersion] val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord(). metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().

View File

@ -38,6 +38,8 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition
import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesVersion}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.{MockTime, ShutdownableThread} import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -147,6 +149,12 @@ class ReplicaManagerConcurrencyTest {
metadataCache: MetadataCache, metadataCache: MetadataCache,
): ReplicaManager = { ): ReplicaManager = {
val logDir = TestUtils.tempDir() val logDir = TestUtils.tempDir()
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(Uuid.randomUuid().toString).
setNodeId(1).
build()
TestUtils.formatDirectories(Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latest(), None)
val props = new Properties val props = new Properties
props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345") props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")

View File

@ -46,7 +46,7 @@ class StopReplicaRequestTest extends BaseRequestTest {
val server = servers.head val server = servers.head
val offlineDir = server.logManager.getLog(tp1).get.dir.getParent val offlineDir = server.logManager.getLog(tp1).get.dir.getParent
server.replicaManager.handleLogDirFailure(offlineDir, sendZkNotification = false) server.replicaManager.handleLogDirFailure(offlineDir, notifyController = false)
val topicStates = Seq( val topicStates = Seq(
new StopReplicaTopicState() new StopReplicaTopicState()

View File

@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import java.util import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit} import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Optional, Properties} import java.util.{Arrays, Collections, Optional, Properties}
import com.yammer.metrics.core.{Gauge, Histogram, Meter} import com.yammer.metrics.core.{Gauge, Histogram, Meter}
@ -39,6 +39,7 @@ import kafka.network.RequestChannel
import kafka.server._ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository} import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.tools.StorageTool
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.zk._ import kafka.zk._
import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.admin.BrokerMetadata
@ -69,8 +70,9 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController import org.apache.kafka.controller.QuorumController
import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -84,7 +86,7 @@ import org.mockito.Mockito
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, immutable, mutable}
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -1410,6 +1412,27 @@ object TestUtils extends Logging {
}.mkString("\n") }.mkString("\n")
} }
def formatDirectories(
directories: immutable.Seq[String],
metaProperties: MetaProperties,
metadataVersion: MetadataVersion,
optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]]
): Unit = {
val stream = new ByteArrayOutputStream()
var out: PrintStream = null
try {
out = new PrintStream(stream)
val bootstrapMetadata = StorageTool.buildBootstrapMetadata(metadataVersion, optionalMetadataRecords, "format command")
if (StorageTool.formatCommand(out, directories, metaProperties, bootstrapMetadata, metadataVersion, ignoreFormatted = false) != 0) {
throw new RuntimeException(stream.toString())
}
debug(s"Formatted storage directory(ies) ${directories}")
} finally {
if (out != null) out.close()
stream.close()
}
}
/** /**
* Create new LogManager instance with default configuration for testing * Create new LogManager instance with default configuration for testing
*/ */
@ -1504,6 +1527,7 @@ object TestUtils extends Logging {
val expands: AtomicInteger = new AtomicInteger(0) val expands: AtomicInteger = new AtomicInteger(0)
val shrinks: AtomicInteger = new AtomicInteger(0) val shrinks: AtomicInteger = new AtomicInteger(0)
val failures: AtomicInteger = new AtomicInteger(0) val failures: AtomicInteger = new AtomicInteger(0)
val directory: AtomicReference[String] = new AtomicReference[String]()
override def markIsrExpand(): Unit = expands.incrementAndGet() override def markIsrExpand(): Unit = expands.incrementAndGet()
@ -1511,10 +1535,13 @@ object TestUtils extends Logging {
override def markFailed(): Unit = failures.incrementAndGet() override def markFailed(): Unit = failures.incrementAndGet()
override def assignDir(dir: String): Unit = directory.set(dir)
def reset(): Unit = { def reset(): Unit = {
expands.set(0) expands.set(0)
shrinks.set(0) shrinks.set(0)
failures.set(0) failures.set(0)
directory.set(null)
} }
} }

View File

@ -189,7 +189,7 @@ public class ControllerRegistration {
setMaxSupportedVersion(entry.getValue().max())); setMaxSupportedVersion(entry.getValue().max()));
} }
return new ApiMessageAndVersion(registrationRecord, return new ApiMessageAndVersion(registrationRecord,
options.metadataVersion().registerBrokerRecordVersion()); options.metadataVersion().registerControllerRecordVersion());
} }
@Override @Override

View File

@ -107,6 +107,25 @@ public interface EventQueue extends AutoCloseable {
} }
} }
class LatestDeadlineFunction implements Function<OptionalLong, OptionalLong> {
private final long newDeadlineNs;
public LatestDeadlineFunction(long newDeadlineNs) {
this.newDeadlineNs = newDeadlineNs;
}
@Override
public OptionalLong apply(OptionalLong prevDeadlineNs) {
if (!prevDeadlineNs.isPresent()) {
return OptionalLong.of(newDeadlineNs);
} else if (prevDeadlineNs.getAsLong() > newDeadlineNs) {
return prevDeadlineNs;
} else {
return OptionalLong.of(newDeadlineNs);
}
}
}
class VoidEvent implements Event { class VoidEvent implements Event {
public final static VoidEvent INSTANCE = new VoidEvent(); public final static VoidEvent INSTANCE = new VoidEvent();

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.apache.kafka.common.Uuid;
public interface DirectoryEventHandler {
/**
* A no-op implementation of {@link DirectoryEventHandler}.
*/
DirectoryEventHandler NOOP = new DirectoryEventHandler() {
@Override public void handleAssignment(TopicIdPartition partition, Uuid directoryId) {}
@Override public void handleFailure(Uuid directoryId) {}
};
/**
* Handle the assignment of a topic partition to a directory.
* @param directoryId The directory ID
* @param partition The topic partition
*/
void handleAssignment(TopicIdPartition partition, Uuid directoryId);
/**
* Handle the transition of an online log directory to the offline state.
* @param directoryId The directory ID
*/
void handleFailure(Uuid directoryId);
}