mirror of https://github.com/apache/kafka.git
KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#14998)
Any directory changes must be considered when replaying BrokerRegistrationChangeRecord. This is necessary to persist directory failures in the cluster metadata, which #14902 missed. Reviewers: Omnia G.H Ibrahim <o.g.h.ibrahim@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
This commit is contained in:
parent
6b02309819
commit
f385ef468b
|
@ -534,7 +534,8 @@ public class ClusterControlManager {
|
|||
record.id(),
|
||||
record.epoch(),
|
||||
BrokerRegistrationFencingChange.FENCE.asBoolean(),
|
||||
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
|
||||
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
|
||||
Optional.empty()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -544,7 +545,8 @@ public class ClusterControlManager {
|
|||
record.id(),
|
||||
record.epoch(),
|
||||
BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
|
||||
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
|
||||
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
|
||||
Optional.empty()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -557,12 +559,14 @@ public class ClusterControlManager {
|
|||
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
|
||||
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
|
||||
"value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
|
||||
Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
|
||||
replayRegistrationChange(
|
||||
record,
|
||||
record.brokerId(),
|
||||
record.brokerEpoch(),
|
||||
fencingChange.asBoolean(),
|
||||
inControlledShutdownChange.asBoolean()
|
||||
inControlledShutdownChange.asBoolean(),
|
||||
directoriesChange
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -571,7 +575,8 @@ public class ClusterControlManager {
|
|||
int brokerId,
|
||||
long brokerEpoch,
|
||||
Optional<Boolean> fencingChange,
|
||||
Optional<Boolean> inControlledShutdownChange
|
||||
Optional<Boolean> inControlledShutdownChange,
|
||||
Optional<List<Uuid>> directoriesChange
|
||||
) {
|
||||
BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
|
||||
if (curRegistration == null) {
|
||||
|
@ -583,7 +588,8 @@ public class ClusterControlManager {
|
|||
} else {
|
||||
BrokerRegistration nextRegistration = curRegistration.cloneWith(
|
||||
fencingChange,
|
||||
inControlledShutdownChange
|
||||
inControlledShutdownChange,
|
||||
directoriesChange
|
||||
);
|
||||
if (!curRegistration.equals(nextRegistration)) {
|
||||
log.info("Replayed {} modifying the registration for broker {}: {}",
|
||||
|
|
|
@ -1401,10 +1401,13 @@ public class ReplicationControlManager {
|
|||
"handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]",
|
||||
brokerId, NO_LEADER, records, iterator);
|
||||
}
|
||||
List<Uuid> newOnlineDirs = registration.directoryDifference(offlineDirs);
|
||||
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
|
||||
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
|
||||
setLogDirs(registration.directoryDifference(offlineDirs)),
|
||||
setLogDirs(newOnlineDirs),
|
||||
(short) 2));
|
||||
log.warn("Directories {} in broker {} marked offline, remaining directories: {}",
|
||||
newOfflineDirs, brokerId, newOnlineDirs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2126,6 +2129,11 @@ public class ReplicationControlManager {
|
|||
if (directoryIsOffline) {
|
||||
leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex));
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Broker {} assigned partition {}:{} to {} dir {}",
|
||||
brokerId, topics.get(topicId).name(), partitionIndex,
|
||||
directoryIsOffline ? "OFFLINE" : "ONLINE", dirId);
|
||||
}
|
||||
}
|
||||
}
|
||||
resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData().
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.image;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.FenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.kafka.metadata.ControllerRegistration;
|
|||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
|
@ -112,6 +114,7 @@ public final class ClusterDelta {
|
|||
BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "fence");
|
||||
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
|
||||
BrokerRegistrationFencingChange.FENCE.asBoolean(),
|
||||
Optional.empty(),
|
||||
Optional.empty()
|
||||
)));
|
||||
}
|
||||
|
@ -120,6 +123,7 @@ public final class ClusterDelta {
|
|||
BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "unfence");
|
||||
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
|
||||
BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
|
||||
Optional.empty(),
|
||||
Optional.empty()
|
||||
)));
|
||||
}
|
||||
|
@ -135,9 +139,11 @@ public final class ClusterDelta {
|
|||
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
|
||||
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
|
||||
"value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
|
||||
Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
|
||||
BrokerRegistration nextRegistration = curRegistration.cloneWith(
|
||||
fencingChange.asBoolean(),
|
||||
inControlledShutdownChange.asBoolean()
|
||||
inControlledShutdownChange.asBoolean(),
|
||||
directoriesChange
|
||||
);
|
||||
if (!curRegistration.equals(nextRegistration)) {
|
||||
changedBrokers.put(record.brokerId(), Optional.of(nextRegistration));
|
||||
|
|
|
@ -390,12 +390,14 @@ public class BrokerRegistration {
|
|||
|
||||
public BrokerRegistration cloneWith(
|
||||
Optional<Boolean> fencingChange,
|
||||
Optional<Boolean> inControlledShutdownChange
|
||||
Optional<Boolean> inControlledShutdownChange,
|
||||
Optional<List<Uuid>> directoriesChange
|
||||
) {
|
||||
boolean newFenced = fencingChange.orElse(fenced);
|
||||
boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown);
|
||||
List<Uuid> newDirectories = directoriesChange.orElse(directories);
|
||||
|
||||
if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown)
|
||||
if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown && newDirectories.equals(directories))
|
||||
return this;
|
||||
|
||||
return new BrokerRegistration(
|
||||
|
@ -408,7 +410,7 @@ public class BrokerRegistration {
|
|||
newFenced,
|
||||
newInControlledShutdownChange,
|
||||
isMigratingZkBroker,
|
||||
directories
|
||||
newDirectories
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2979,6 +2979,8 @@ public class ReplicationControlManagerTest {
|
|||
sortPartitionChangeRecords(filter(records, PartitionChangeRecord.class))
|
||||
);
|
||||
assertEquals(3, records.size());
|
||||
ctx.replay(records);
|
||||
assertEquals(Collections.singletonList(dir2b1), ctx.clusterControl.registration(b1).directories());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue