KAFKA-13850: Show missing record type in MetadataShell (#12103)

AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.

 Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
dengziming 2022-08-25 14:09:01 +08:00 committed by GitHub
parent b988528b91
commit 19581effbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 1 deletions

View File

@ -278,6 +278,10 @@
<!-- Shell -->
<suppress checks="CyclomaticComplexity"
files="(GlobComponent|MetadataNodeManager).java"/>
<suppress checks="MethodLength"
files="(MetadataNodeManager).java"/>
<suppress checks="JavaNCSS"
files="(MetadataNodeManager).java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"

View File

@ -23,6 +23,6 @@
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The feature name." },
{ "name": "FeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized feature level of this feature for the cluster." }
"about": "The current finalized feature level of this feature for the cluster, a value of 0 means feature not supported." }
]
}

View File

@ -21,9 +21,14 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@ -31,6 +36,7 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@ -39,6 +45,8 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
@ -302,6 +310,22 @@ public final class MetadataNodeManager implements AutoCloseable {
create("isFenced").setContents("false");
break;
}
case BROKER_REGISTRATION_CHANGE_RECORD: {
BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord) message;
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).get();
if (fencingChange != BrokerRegistrationFencingChange.NONE) {
data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
.create("isFenced").setContents(Boolean.toString(fencingChange.asBoolean().get()));
}
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).get();
if (inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {
data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
.create("inControlledShutdown").setContents(Boolean.toString(inControlledShutdownChange.asBoolean().get()));
}
break;
}
case REMOVE_TOPIC_RECORD: {
RemoveTopicRecord record = (RemoveTopicRecord) message;
DirectoryNode topicsDirectory =
@ -333,6 +357,35 @@ public final class MetadataNodeManager implements AutoCloseable {
producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + "");
break;
}
case ACCESS_CONTROL_ENTRY_RECORD: {
AccessControlEntryRecord record = (AccessControlEntryRecord) message;
DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
FileNode file = acls.create(record.id().toString());
file.setContents(AccessControlEntryRecordJsonConverter.write(record,
AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
break;
}
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
RemoveAccessControlEntryRecord record = (RemoveAccessControlEntryRecord) message;
DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
acls.rmrf(record.id().toString());
break;
}
case FEATURE_LEVEL_RECORD: {
FeatureLevelRecord record = (FeatureLevelRecord) message;
DirectoryNode features = data.root.mkdirs("features");
if (record.featureLevel() == 0) {
features.rmrf(record.name());
} else {
FileNode file = features.create(record.name());
file.setContents(FeatureLevelRecordJsonConverter.write(record,
FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
}
break;
}
case NO_OP_RECORD: {
break;
}
default:
throw new RuntimeException("Unhandled metadata record type");
}

View File

@ -18,19 +18,31 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -256,6 +268,61 @@ public class MetadataNodeManagerTest {
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
}
@Test
public void testBrokerRegistrationChangeRecord() {
RegisterBrokerRecord record = new RegisterBrokerRecord()
.setBrokerId(1)
.setBrokerEpoch(2);
metadataNodeManager.handleMessage(record);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
// Unfence broker
BrokerRegistrationChangeRecord record1 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
metadataNodeManager.handleMessage(record1);
assertEquals("false",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
// Fence broker
BrokerRegistrationChangeRecord record2 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.FENCE.value());
metadataNodeManager.handleMessage(record2);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
// Unchanged
BrokerRegistrationChangeRecord record3 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.NONE.value());
metadataNodeManager.handleMessage(record3);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
// Controlled shutdown
BrokerRegistrationChangeRecord record4 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
metadataNodeManager.handleMessage(record4);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
// Unchanged
BrokerRegistrationChangeRecord record5 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.NONE.value());
metadataNodeManager.handleMessage(record5);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
}
@Test
public void testClientQuotaRecord() {
ClientQuotaRecord record = new ClientQuotaRecord()
@ -336,4 +403,43 @@ public class MetadataNodeManagerTest {
11000 + "",
metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
}
@Test
public void testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
AccessControlEntryRecord record1 = new AccessControlEntryRecord()
.setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
.setHost("example.com")
.setResourceType(ResourceType.GROUP.code())
.setResourceName("group")
.setOperation(AclOperation.READ.code())
.setPermissionType(AclPermissionType.ALLOW.code())
.setPrincipal("User:kafka")
.setPatternType(PatternType.LITERAL.code());
metadataNodeManager.handleMessage(record1);
assertEquals(
AccessControlEntryRecordJsonConverter.write(record1, AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
metadataNodeManager.getData().root().directory("acl").directory("id").file("GcaQDl2UTsCNs1p9s37XkQ").contents());
RemoveAccessControlEntryRecord record2 = new RemoveAccessControlEntryRecord()
.setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
metadataNodeManager.handleMessage(record2);
assertFalse(metadataNodeManager.getData().root().directory("acl").directory("id").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
}
@Test
public void testFeatureLevelRecord() {
FeatureLevelRecord record1 = new FeatureLevelRecord()
.setName("metadata.version")
.setFeatureLevel((short) 3);
metadataNodeManager.handleMessage(record1);
assertEquals(
FeatureLevelRecordJsonConverter.write(record1, FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
metadataNodeManager.getData().root().directory("features").file("metadata.version").contents());
FeatureLevelRecord record2 = new FeatureLevelRecord()
.setName("metadata.version")
.setFeatureLevel((short) 0);
metadataNodeManager.handleMessage(record2);
assertFalse(metadataNodeManager.getData().root().directory("features").children().containsKey("metadata.version"));
}
}