mirror of https://github.com/apache/kafka.git
MINOR: MetaProperties refactor, part 1 (#14678)
Since we have added directory.id to MetaProperties, it is no longer safe to assume that all directories on a node contain the same MetaProperties. Therefore, we should get rid of places where we are using a single MetaProperties object to represent the settings of an entire cluster. This PR removes a few such cases. In each case, it is sufficient just to pass cluster ID. The second part of this change refactors KafkaClusterTestKit so that we convert paths to absolute before creating BrokerNode and ControllerNode objects, rather than after. This prepares the way for storing an ensemble of MetaProperties objects in BrokerNode and ControllerNode, which we will do in a follow-up change. Reviewers: Ron Dagostino <rndgstn@gmail.com>
This commit is contained in:
parent
57662efec9
commit
4d8efa94cb
|
@ -26,7 +26,7 @@ import kafka.log.LogManager
|
|||
import kafka.log.UnifiedLog
|
||||
import kafka.raft.KafkaRaftManager.RaftIoThread
|
||||
import kafka.server.KafkaRaftServer.ControllerRole
|
||||
import kafka.server.{KafkaConfig, MetaProperties}
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.CoreUtils
|
||||
import kafka.utils.FileLock
|
||||
import kafka.utils.Logging
|
||||
|
@ -128,7 +128,7 @@ trait RaftManager[T] {
|
|||
}
|
||||
|
||||
class KafkaRaftManager[T](
|
||||
metaProperties: MetaProperties,
|
||||
clusterId: String,
|
||||
config: KafkaConfig,
|
||||
recordSerde: RecordSerde[T],
|
||||
topicPartition: TopicPartition,
|
||||
|
@ -241,7 +241,7 @@ class KafkaRaftManager[T](
|
|||
metrics,
|
||||
expirationService,
|
||||
logContext,
|
||||
metaProperties.clusterId,
|
||||
clusterId,
|
||||
nodeId,
|
||||
raftConfig
|
||||
)
|
||||
|
|
|
@ -73,7 +73,7 @@ class ControllerApis(
|
|||
val controller: Controller,
|
||||
val raftManager: RaftManager[ApiMessageAndVersion],
|
||||
val config: KafkaConfig,
|
||||
val metaProperties: MetaProperties,
|
||||
val clusterId: String,
|
||||
val registrationsPublisher: ControllerRegistrationsPublisher,
|
||||
val apiVersionManager: ApiVersionManager,
|
||||
val metadataCache: KRaftMetadataCache
|
||||
|
@ -1067,7 +1067,7 @@ class ControllerApis(
|
|||
val response = authHelper.computeDescribeClusterResponse(
|
||||
request,
|
||||
EndpointType.CONTROLLER,
|
||||
metaProperties.clusterId,
|
||||
clusterId,
|
||||
() => registrationsPublisher.describeClusterControllers(request.context.listenerName()),
|
||||
() => raftManager.leaderAndEpoch.leaderId().orElse(-1)
|
||||
)
|
||||
|
|
|
@ -137,7 +137,7 @@ class ControllerServer(
|
|||
true
|
||||
}
|
||||
|
||||
def clusterId: String = sharedServer.metaProps.clusterId
|
||||
def clusterId: String = sharedServer.clusterId()
|
||||
|
||||
def startup(): Unit = {
|
||||
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
|
||||
|
@ -319,7 +319,7 @@ class ControllerServer(
|
|||
controller,
|
||||
raftManager,
|
||||
config,
|
||||
sharedServer.metaProps,
|
||||
clusterId,
|
||||
registrationsPublisher,
|
||||
apiVersionManager,
|
||||
metadataCache)
|
||||
|
|
|
@ -387,11 +387,10 @@ class KafkaServer(
|
|||
isZkBroker = true)
|
||||
|
||||
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
|
||||
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
|
||||
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
|
||||
RaftConfig.parseVoterConnections(config.quorumVoters))
|
||||
val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
|
||||
kraftMetaProps,
|
||||
clusterId,
|
||||
config,
|
||||
new MetadataRecordSerde,
|
||||
KafkaRaftServer.MetadataPartition,
|
||||
|
@ -436,7 +435,7 @@ class KafkaServer(
|
|||
lifecycleManager.start(
|
||||
() => listener.highestOffset,
|
||||
brokerToQuorumChannelManager,
|
||||
kraftMetaProps.clusterId,
|
||||
clusterId,
|
||||
networkListeners,
|
||||
ibpAsFeature,
|
||||
OptionalLong.empty()
|
||||
|
|
|
@ -110,6 +110,8 @@ class SharedServer(
|
|||
@volatile var snapshotGenerator: SnapshotGenerator = _
|
||||
@volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
|
||||
|
||||
def clusterId(): String = metaProps.clusterId
|
||||
|
||||
def isUsed(): Boolean = synchronized {
|
||||
usedByController || usedByBroker
|
||||
}
|
||||
|
@ -248,7 +250,7 @@ class SharedServer(
|
|||
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
|
||||
}
|
||||
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
|
||||
metaProps,
|
||||
clusterId(),
|
||||
sharedServerConfig,
|
||||
new MetadataRecordSerde,
|
||||
KafkaRaftServer.MetadataPartition,
|
||||
|
|
|
@ -23,7 +23,7 @@ import joptsimple.OptionException
|
|||
import kafka.network.{DataPlaneAcceptor, SocketServer}
|
||||
import kafka.raft.{KafkaRaftManager, RaftManager}
|
||||
import kafka.security.CredentialProvider
|
||||
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
|
||||
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
|
||||
import kafka.utils.{CoreUtils, Exit, Logging}
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException
|
||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||
|
@ -82,13 +82,8 @@ class TestRaftServer(
|
|||
() => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
|
||||
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
|
||||
|
||||
val metaProperties = MetaProperties(
|
||||
clusterId = Uuid.ZERO_UUID.toString,
|
||||
nodeId = config.nodeId
|
||||
)
|
||||
|
||||
raftManager = new KafkaRaftManager[Array[Byte]](
|
||||
metaProperties,
|
||||
Uuid.ZERO_UUID.toString,
|
||||
config,
|
||||
new ByteArraySerde,
|
||||
partition,
|
||||
|
|
|
@ -49,7 +49,9 @@ public class BrokerNode implements TestKitNode {
|
|||
return this;
|
||||
}
|
||||
|
||||
public BrokerNode build() {
|
||||
BrokerNode build(
|
||||
String baseDirectory
|
||||
) {
|
||||
if (id == -1) {
|
||||
throw new RuntimeException("You must set the node id");
|
||||
}
|
||||
|
@ -60,9 +62,11 @@ public class BrokerNode implements TestKitNode {
|
|||
logDataDirectories = Collections.
|
||||
singletonList(String.format("broker_%d_data0", id));
|
||||
}
|
||||
logDataDirectories = TestKitNodes.absolutize(baseDirectory, logDataDirectories);
|
||||
if (metadataDirectory == null) {
|
||||
metadataDirectory = logDataDirectories.get(0);
|
||||
}
|
||||
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
|
||||
return new BrokerNode(id, incarnationId, metadataDirectory,
|
||||
logDataDirectories);
|
||||
}
|
||||
|
|
|
@ -32,13 +32,16 @@ public class ControllerNode implements TestKitNode {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ControllerNode build() {
|
||||
public ControllerNode build(
|
||||
String baseDirectory
|
||||
) {
|
||||
if (id == -1) {
|
||||
throw new RuntimeException("You must set the node id");
|
||||
}
|
||||
if (metadataDirectory == null) {
|
||||
metadataDirectory = String.format("controller_%d", id);
|
||||
}
|
||||
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
|
||||
return new ControllerNode(id, metadataDirectory);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -216,11 +216,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
ExecutorService executorService = null;
|
||||
ControllerQuorumVotersFutureManager connectFutureManager =
|
||||
new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
|
||||
File baseDirectory = null;
|
||||
File baseDirectory = new File(nodes.baseDirectory());
|
||||
|
||||
try {
|
||||
baseDirectory = TestUtils.tempDirectory();
|
||||
nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
|
||||
executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
|
||||
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
|
||||
for (ControllerNode node : nodes.controllerNodes().values()) {
|
||||
|
|
|
@ -20,14 +20,17 @@ package kafka.testkit;
|
|||
import kafka.server.MetaProperties;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -36,8 +39,8 @@ public class TestKitNodes {
|
|||
private boolean combined = false;
|
||||
private Uuid clusterId = null;
|
||||
private MetadataVersion bootstrapMetadataVersion = null;
|
||||
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
|
||||
private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
|
||||
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
|
||||
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();
|
||||
|
||||
public Builder setClusterId(Uuid clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
|
@ -54,42 +57,21 @@ public class TestKitNodes {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder addNodes(TestKitNode[] nodes) {
|
||||
for (TestKitNode node : nodes) {
|
||||
addNode(node);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addNode(TestKitNode node) {
|
||||
if (node instanceof ControllerNode) {
|
||||
ControllerNode controllerNode = (ControllerNode) node;
|
||||
controllerNodes.put(node.id(), controllerNode);
|
||||
} else if (node instanceof BrokerNode) {
|
||||
BrokerNode brokerNode = (BrokerNode) node;
|
||||
brokerNodes.put(node.id(), brokerNode);
|
||||
} else {
|
||||
throw new RuntimeException("Can't handle TestKitNode subclass " +
|
||||
node.getClass().getSimpleName());
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setNumControllerNodes(int numControllerNodes) {
|
||||
if (numControllerNodes < 0) {
|
||||
throw new RuntimeException("Invalid negative value for numControllerNodes");
|
||||
}
|
||||
|
||||
while (controllerNodes.size() > numControllerNodes) {
|
||||
controllerNodes.pollFirstEntry();
|
||||
while (controllerNodeBuilders.size() > numControllerNodes) {
|
||||
controllerNodeBuilders.pollFirstEntry();
|
||||
}
|
||||
while (controllerNodes.size() < numControllerNodes) {
|
||||
while (controllerNodeBuilders.size() < numControllerNodes) {
|
||||
int nextId = startControllerId();
|
||||
if (!controllerNodes.isEmpty()) {
|
||||
nextId = controllerNodes.lastKey() + 1;
|
||||
if (!controllerNodeBuilders.isEmpty()) {
|
||||
nextId = controllerNodeBuilders.lastKey() + 1;
|
||||
}
|
||||
controllerNodes.put(nextId, new ControllerNode.Builder().
|
||||
setId(nextId).build());
|
||||
controllerNodeBuilders.put(nextId, new ControllerNode.Builder().
|
||||
setId(nextId));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -98,16 +80,16 @@ public class TestKitNodes {
|
|||
if (numBrokerNodes < 0) {
|
||||
throw new RuntimeException("Invalid negative value for numBrokerNodes");
|
||||
}
|
||||
while (brokerNodes.size() > numBrokerNodes) {
|
||||
brokerNodes.pollFirstEntry();
|
||||
while (brokerNodeBuilders.size() > numBrokerNodes) {
|
||||
brokerNodeBuilders.pollFirstEntry();
|
||||
}
|
||||
while (brokerNodes.size() < numBrokerNodes) {
|
||||
while (brokerNodeBuilders.size() < numBrokerNodes) {
|
||||
int nextId = startBrokerId();
|
||||
if (!brokerNodes.isEmpty()) {
|
||||
nextId = brokerNodes.lastKey() + 1;
|
||||
if (!brokerNodeBuilders.isEmpty()) {
|
||||
nextId = brokerNodeBuilders.lastKey() + 1;
|
||||
}
|
||||
brokerNodes.put(nextId, new BrokerNode.Builder().
|
||||
setId(nextId).build());
|
||||
brokerNodeBuilders.put(nextId, new BrokerNode.Builder().
|
||||
setId(nextId));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -119,7 +101,35 @@ public class TestKitNodes {
|
|||
if (bootstrapMetadataVersion == null) {
|
||||
bootstrapMetadataVersion = MetadataVersion.latest();
|
||||
}
|
||||
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
|
||||
String baseDirectory = TestUtils.tempDirectory("kafka_" + clusterId).getAbsolutePath();
|
||||
try {
|
||||
NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
|
||||
for (ControllerNode.Builder controllerNodeBuilder : controllerNodeBuilders.values()) {
|
||||
ControllerNode controllerNode = controllerNodeBuilder.build(baseDirectory);
|
||||
if (controllerNodes.put(controllerNode.id(), controllerNode) != null) {
|
||||
throw new RuntimeException("More than one controller claimed ID " + controllerNode.id());
|
||||
}
|
||||
}
|
||||
NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
|
||||
for (BrokerNode.Builder brokerNodeBuilder : brokerNodeBuilders.values()) {
|
||||
BrokerNode brokerNode = brokerNodeBuilder.build(baseDirectory);
|
||||
if (brokerNodes.put(brokerNode.id(), brokerNode) != null) {
|
||||
throw new RuntimeException("More than one broker claimed ID " + brokerNode.id());
|
||||
}
|
||||
}
|
||||
return new TestKitNodes(baseDirectory,
|
||||
clusterId,
|
||||
bootstrapMetadataVersion,
|
||||
controllerNodes,
|
||||
brokerNodes);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
Utils.delete(new File(baseDirectory));
|
||||
} catch (IOException x) {
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private int startBrokerId() {
|
||||
|
@ -134,6 +144,7 @@ public class TestKitNodes {
|
|||
}
|
||||
}
|
||||
|
||||
private final String baseDirectory;
|
||||
private final Uuid clusterId;
|
||||
private final MetadataVersion bootstrapMetadataVersion;
|
||||
private final NavigableMap<Integer, ControllerNode> controllerNodes;
|
||||
|
@ -143,16 +154,24 @@ public class TestKitNodes {
|
|||
return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
|
||||
}
|
||||
|
||||
private TestKitNodes(Uuid clusterId,
|
||||
MetadataVersion bootstrapMetadataVersion,
|
||||
NavigableMap<Integer, ControllerNode> controllerNodes,
|
||||
NavigableMap<Integer, BrokerNode> brokerNodes) {
|
||||
private TestKitNodes(
|
||||
String baseDirectory,
|
||||
Uuid clusterId,
|
||||
MetadataVersion bootstrapMetadataVersion,
|
||||
NavigableMap<Integer, ControllerNode> controllerNodes,
|
||||
NavigableMap<Integer, BrokerNode> brokerNodes
|
||||
) {
|
||||
this.baseDirectory = baseDirectory;
|
||||
this.clusterId = clusterId;
|
||||
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
|
||||
this.controllerNodes = controllerNodes;
|
||||
this.brokerNodes = brokerNodes;
|
||||
}
|
||||
|
||||
public String baseDirectory() {
|
||||
return baseDirectory;
|
||||
}
|
||||
|
||||
public Uuid clusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
@ -189,24 +208,7 @@ public class TestKitNodes {
|
|||
return new ListenerName("CONTROLLER");
|
||||
}
|
||||
|
||||
public TestKitNodes copyWithAbsolutePaths(String baseDirectory) {
|
||||
NavigableMap<Integer, ControllerNode> newControllerNodes = new TreeMap<>();
|
||||
NavigableMap<Integer, BrokerNode> newBrokerNodes = new TreeMap<>();
|
||||
for (Entry<Integer, ControllerNode> entry : controllerNodes.entrySet()) {
|
||||
ControllerNode node = entry.getValue();
|
||||
newControllerNodes.put(entry.getKey(), new ControllerNode(node.id(),
|
||||
absolutize(baseDirectory, node.metadataDirectory())));
|
||||
}
|
||||
for (Entry<Integer, BrokerNode> entry : brokerNodes.entrySet()) {
|
||||
BrokerNode node = entry.getValue();
|
||||
newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(),
|
||||
node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
|
||||
absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
|
||||
}
|
||||
return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
|
||||
}
|
||||
|
||||
private static List<String> absolutize(String base, Collection<String> directories) {
|
||||
static List<String> absolutize(String base, Collection<String> directories) {
|
||||
List<String> newDirectories = new ArrayList<>();
|
||||
for (String directory : directories) {
|
||||
newDirectories.add(absolutize(base, directory));
|
||||
|
@ -214,7 +216,7 @@ public class TestKitNodes {
|
|||
return newDirectories;
|
||||
}
|
||||
|
||||
private static String absolutize(String base, String directory) {
|
||||
static String absolutize(String base, String directory) {
|
||||
if (Paths.get(directory).isAbsolute()) {
|
||||
return directory;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import kafka.server.KafkaConfig
|
|||
import kafka.server.KafkaRaftServer.BrokerRole
|
||||
import kafka.server.KafkaRaftServer.ControllerRole
|
||||
import kafka.server.KafkaRaftServer.ProcessRole
|
||||
import kafka.server.MetaProperties
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.tools.TestRaftServer.ByteArraySerde
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -83,13 +82,9 @@ class RaftManagerTest {
|
|||
config: KafkaConfig
|
||||
): KafkaRaftManager[Array[Byte]] = {
|
||||
val topicId = new Uuid(0L, 2L)
|
||||
val metaProperties = MetaProperties(
|
||||
clusterId = Uuid.randomUuid.toString,
|
||||
nodeId = config.nodeId
|
||||
)
|
||||
|
||||
new KafkaRaftManager[Array[Byte]](
|
||||
metaProperties,
|
||||
Uuid.randomUuid.toString,
|
||||
config,
|
||||
new ByteArraySerde,
|
||||
topicPartition,
|
||||
|
|
|
@ -159,7 +159,7 @@ class ControllerApisTest {
|
|||
controller,
|
||||
raftManager,
|
||||
new KafkaConfig(props),
|
||||
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
|
||||
"JgxuGe9URy-E-ceaL04lEw",
|
||||
new ControllerRegistrationsPublisher(),
|
||||
new SimpleApiVersionManager(
|
||||
ListenerType.CONTROLLER,
|
||||
|
|
Loading…
Reference in New Issue