KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#15042)

Any directory changes must be considered when replaying
BrokerRegistrationChangeRecord. This is necessary
to persist directory failures in the cluster metadata,
which #14902 missed.

Co-authored-by: Igor Soarez <soarez@apple.com>
Reviewers: Omnia G.H Ibrahim <o.g.h.ibrahim@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
This commit is contained in:
Viktor Somogyi-Vass 2023-12-18 21:44:30 +01:00 committed by GitHub
parent 3805829304
commit dc589904dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 10 deletions

View File

@ -534,7 +534,8 @@ public class ClusterControlManager {
record.id(), record.id(),
record.epoch(), record.epoch(),
BrokerRegistrationFencingChange.FENCE.asBoolean(), BrokerRegistrationFencingChange.FENCE.asBoolean(),
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean() BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
Optional.empty()
); );
} }
@ -544,7 +545,8 @@ public class ClusterControlManager {
record.id(), record.id(),
record.epoch(), record.epoch(),
BrokerRegistrationFencingChange.UNFENCE.asBoolean(), BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean() BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(),
Optional.empty()
); );
} }
@ -557,12 +559,14 @@ public class ClusterControlManager {
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " + () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %x", record, record.inControlledShutdown()))); "value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
replayRegistrationChange( replayRegistrationChange(
record, record,
record.brokerId(), record.brokerId(),
record.brokerEpoch(), record.brokerEpoch(),
fencingChange.asBoolean(), fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean() inControlledShutdownChange.asBoolean(),
directoriesChange
); );
} }
@ -571,7 +575,8 @@ public class ClusterControlManager {
int brokerId, int brokerId,
long brokerEpoch, long brokerEpoch,
Optional<Boolean> fencingChange, Optional<Boolean> fencingChange,
Optional<Boolean> inControlledShutdownChange Optional<Boolean> inControlledShutdownChange,
Optional<List<Uuid>> directoriesChange
) { ) {
BrokerRegistration curRegistration = brokerRegistrations.get(brokerId); BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
if (curRegistration == null) { if (curRegistration == null) {
@ -583,7 +588,8 @@ public class ClusterControlManager {
} else { } else {
BrokerRegistration nextRegistration = curRegistration.cloneWith( BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange, fencingChange,
inControlledShutdownChange inControlledShutdownChange,
directoriesChange
); );
if (!curRegistration.equals(nextRegistration)) { if (!curRegistration.equals(nextRegistration)) {
log.info("Replayed {} modifying the registration for broker {}: {}", log.info("Replayed {} modifying the registration for broker {}: {}",

View File

@ -1401,10 +1401,13 @@ public class ReplicationControlManager {
"handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]", "handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]",
brokerId, NO_LEADER, records, iterator); brokerId, NO_LEADER, records, iterator);
} }
List<Uuid> newOnlineDirs = registration.directoryDifference(offlineDirs);
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
setLogDirs(registration.directoryDifference(offlineDirs)), setLogDirs(newOnlineDirs),
(short) 2)); (short) 2));
log.warn("Directories {} in broker {} marked offline, remaining directories: {}",
newOfflineDirs, brokerId, newOnlineDirs);
} }
} }
@ -2126,6 +2129,11 @@ public class ReplicationControlManager {
if (directoryIsOffline) { if (directoryIsOffline) {
leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex)); leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex));
} }
if (log.isDebugEnabled()) {
log.debug("Broker {} assigned partition {}:{} to {} dir {}",
brokerId, topics.get(topicId).name(), partitionIndex,
directoryIsOffline ? "OFFLINE" : "ONLINE", dirId);
}
} }
} }
resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData(). resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData().

View File

@ -17,6 +17,7 @@
package org.apache.kafka.image; package org.apache.kafka.image;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@ -30,6 +31,7 @@ import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
@ -112,6 +114,7 @@ public final class ClusterDelta {
BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "fence"); BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "fence");
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith( changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
BrokerRegistrationFencingChange.FENCE.asBoolean(), BrokerRegistrationFencingChange.FENCE.asBoolean(),
Optional.empty(),
Optional.empty() Optional.empty()
))); )));
} }
@ -120,6 +123,7 @@ public final class ClusterDelta {
BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "unfence"); BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "unfence");
changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith( changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
BrokerRegistrationFencingChange.UNFENCE.asBoolean(), BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
Optional.empty(),
Optional.empty() Optional.empty()
))); )));
} }
@ -135,9 +139,11 @@ public final class ClusterDelta {
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " + () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %d", record, record.inControlledShutdown()))); "value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
BrokerRegistration nextRegistration = curRegistration.cloneWith( BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange.asBoolean(), fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean() inControlledShutdownChange.asBoolean(),
directoriesChange
); );
if (!curRegistration.equals(nextRegistration)) { if (!curRegistration.equals(nextRegistration)) {
changedBrokers.put(record.brokerId(), Optional.of(nextRegistration)); changedBrokers.put(record.brokerId(), Optional.of(nextRegistration));

View File

@ -390,12 +390,14 @@ public class BrokerRegistration {
public BrokerRegistration cloneWith( public BrokerRegistration cloneWith(
Optional<Boolean> fencingChange, Optional<Boolean> fencingChange,
Optional<Boolean> inControlledShutdownChange Optional<Boolean> inControlledShutdownChange,
Optional<List<Uuid>> directoriesChange
) { ) {
boolean newFenced = fencingChange.orElse(fenced); boolean newFenced = fencingChange.orElse(fenced);
boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown); boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown);
List<Uuid> newDirectories = directoriesChange.orElse(directories);
if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown) if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown && newDirectories.equals(directories))
return this; return this;
return new BrokerRegistration( return new BrokerRegistration(
@ -408,7 +410,7 @@ public class BrokerRegistration {
newFenced, newFenced,
newInControlledShutdownChange, newInControlledShutdownChange,
isMigratingZkBroker, isMigratingZkBroker,
directories newDirectories
); );
} }
} }

View File

@ -2979,6 +2979,8 @@ public class ReplicationControlManagerTest {
sortPartitionChangeRecords(filter(records, PartitionChangeRecord.class)) sortPartitionChangeRecords(filter(records, PartitionChangeRecord.class))
); );
assertEquals(3, records.size()); assertEquals(3, records.size());
ctx.replay(records);
assertEquals(Collections.singletonList(dir2b1), ctx.clusterControl.registration(b1).directories());
} }
/** /**