KIP-1073: Return fenced brokers in DescribeCluster response (#17524)

mplementation of KIP-1073: Return fenced brokers in DescribeCluster response.
Add new unit and integration tests for describeCluster.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Gantigmaa Selenge 2024-12-13 02:58:11 +00:00 committed by GitHub
parent d7c80a7257
commit 747dc172e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 252 additions and 18 deletions

View File

@ -29,6 +29,8 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
private boolean includeAuthorizedOperations;
private boolean includeFencedBrokers;
/**
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
@ -45,6 +47,11 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
return this;
}
public DescribeClusterOptions includeFencedBrokers(boolean includeFencedBrokers) {
this.includeFencedBrokers = includeFencedBrokers;
return this;
}
/**
* Specify if authorized operations should be included in the response. Note that some
* older brokers cannot not supply this information even if it is requested.
@ -52,4 +59,12 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
/**
* Specify if fenced brokers should be included in the response. Note that some
* older brokers cannot not supply this information even if it is requested.
*/
public boolean includeFencedBrokers() {
return includeFencedBrokers;
}
}

View File

@ -2504,10 +2504,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
if (!useMetadataRequest) {
if (metadataManager.usingBootstrapControllers() && options.includeFencedBrokers()) {
throw new IllegalArgumentException("Cannot request fenced brokers from controller endpoint");
}
return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
.setEndpointType(metadataManager.usingBootstrapControllers() ?
EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()));
EndpointType.CONTROLLER.id() : EndpointType.BROKER.id())
.setIncludeFencedBrokers(options.includeFencedBrokers()));
} else {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
@ -2523,7 +2527,6 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
if (!useMetadataRequest) {
DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
ApiError apiError = new ApiError(error, response.data().errorMessage());
@ -2571,6 +2574,12 @@ public class KafkaAdminClient extends AdminClient {
return false;
}
// If unsupportedVersion exception was caused by the option to include fenced brokers (only supported for version 2+)
// then we should not fall back to the metadataRequest.
if (options.includeFencedBrokers()) {
return false;
}
useMetadataRequest = true;
return true;
}

View File

@ -30,12 +30,13 @@ public class Node {
private final String host;
private final int port;
private final String rack;
private final boolean isFenced;
// Cache hashCode as it is called in performance sensitive parts of the code (e.g. RecordAccumulator.ready)
private Integer hash;
public Node(int id, String host, int port) {
this(id, host, port, null);
this(id, host, port, null, false);
}
public Node(int id, String host, int port, String rack) {
@ -44,6 +45,16 @@ public class Node {
this.host = host;
this.port = port;
this.rack = rack;
this.isFenced = false;
}
public Node(int id, String host, int port, String rack, boolean isFenced) {
this.id = id;
this.idString = Integer.toString(id);
this.host = host;
this.port = port;
this.rack = rack;
this.isFenced = isFenced;
}
public static Node noNode() {
@ -102,6 +113,13 @@ public class Node {
return rack;
}
/**
* Whether if this node is fenced
*/
public boolean isFenced() {
return isFenced;
}
@Override
public int hashCode() {
Integer h = this.hash;
@ -110,6 +128,7 @@ public class Node {
result = 31 * result + id;
result = 31 * result + port;
result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
result = 31 * result + Objects.hashCode(isFenced);
this.hash = result;
return result;
} else {
@ -127,12 +146,13 @@ public class Node {
return id == other.id &&
port == other.port &&
Objects.equals(host, other.host) &&
Objects.equals(rack, other.rack);
Objects.equals(rack, other.rack) &&
Objects.equals(isFenced, other.isFenced);
}
@Override
public String toString() {
return host + ":" + port + " (id: " + idString + " rack: " + rack + ")";
return host + ":" + port + " (id: " + idString + " rack: " + rack + " isFenced: " + isFenced + ")";
}
}

View File

@ -39,7 +39,7 @@ public class DescribeClusterResponse extends AbstractResponse {
public Map<Integer, Node> nodes() {
return data.brokers().valuesList().stream()
.map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack()))
.map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack(), b.isFenced()))
.collect(Collectors.toMap(Node::id, Function.identity()));
}

View File

@ -20,13 +20,16 @@
"name": "DescribeClusterRequest",
//
// Version 1 adds EndpointType for KIP-919 support.
// Version 2 adds IncludeFencedBrokers for KIP-1073 support.
//
"validVersions": "0-1",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include cluster authorized operations." },
{ "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
"about": "The endpoint type to describe. 1=brokers, 2=controllers." }
"about": "The endpoint type to describe. 1=brokers, 2=controllers." },
{ "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+",
"about": "Whether to include fenced brokers when listing brokers." }
]
}

View File

@ -20,8 +20,9 @@
//
// Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and
// UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
// Version 2 adds IsFenced field to Brokers for KIP-1073 support.
//
"validVersions": "0-1",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@ -45,7 +46,9 @@
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The broker port." },
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The rack of the broker, or null if it has not been assigned to a rack." }
"about": "The rack of the broker, or null if it has not been assigned to a rack." },
{ "name": "IsFenced", "type": "bool", "versions": "2+",
"about": "Whether the broker is fenced" }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this cluster." }

View File

@ -3166,6 +3166,23 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() {
ApiVersion describeClusterV1 = new ApiVersion()
.setApiKey(ApiKeys.DESCRIBE_CLUSTER.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(describeClusterV1)));
env.kafkaClient().prepareUnsupportedVersionResponse(
request -> request instanceof DescribeClusterRequest);
final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true));
TestUtils.assertFutureThrows(result.nodes(), UnsupportedVersionException.class);
}
}
@Test
public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),

View File

@ -3617,12 +3617,16 @@ class KafkaApis(val requestChannel: RequestChannel,
clusterId,
() => {
val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection()
metadataCache.getAliveBrokerNodes(request.context.listenerName).foreach { node =>
val describeClusterRequest = request.body[DescribeClusterRequest]
metadataCache.getBrokerNodes(request.context.listenerName).foreach { node =>
if (!node.isFenced || describeClusterRequest.data().includeFencedBrokers()) {
brokers.add(new DescribeClusterResponseData.DescribeClusterBroker().
setBrokerId(node.id).
setHost(node.host).
setPort(node.port).
setRack(node.rack))
setRack(node.rack).
setIsFenced(node.isFenced))
}
}
brokers
},

View File

@ -75,6 +75,8 @@ trait MetadataCache {
def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node]
def getBrokerNodes(listenerName: ListenerName): Iterable[Node]
def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
/**

View File

@ -381,6 +381,10 @@ class KRaftMetadataCache(
flatMap(_.node(listenerName.value()).toScala).toSeq
}
override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = {
_currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).asScala).toSeq
}
// Does NOT include offline replica metadata
override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
Option(_currentImage.topics().getTopic(topicName)).

View File

@ -353,6 +353,10 @@ class ZkMetadataCache(
metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
}
override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
getAliveBrokerNodes(listenerName)
}
def getTopicId(topicName: String): Uuid = {
metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
}

View File

@ -595,6 +595,30 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListNodesWithFencedBroker(quorum: String): Unit = {
client = createAdminClient
val fencedBrokerId = brokers.last.config.brokerId
killBroker(fencedBrokerId, JDuration.ofMillis(0))
// It takes a few seconds for a broker to get fenced after being killed
// So we retry until only 2 of 3 brokers returned in the result or the max wait is reached
TestUtils.retry(20000) {
assertTrue(client.describeCluster().nodes().get().asScala.size.equals(brokers.size - 1))
}
// List nodes again but this time include the fenced broker
val nodes = client.describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)).nodes().get().asScala
assertTrue(nodes.size.equals(brokers.size))
nodes.foreach(node => {
if (node.id().equals(fencedBrokerId)) {
assertTrue(node.isFenced)
} else {
assertFalse(node.isFenced)
}
})
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {

View File

@ -18,7 +18,7 @@ import java.util.Properties
import com.yammer.metrics.core.Gauge
import kafka.security.JaasTestUtils
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult}
import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult, DescribeClusterOptions}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@ -32,7 +32,7 @@ import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -158,6 +158,25 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListNodesFromControllersIncludingFencedBrokers(quorum: String): Unit = {
useBoostrapControllers()
client = createAdminClient
val result = client.describeCluster(new DescribeClusterOptions().includeFencedBrokers(true))
val exception = assertThrows(classOf[Exception], () => { result.nodes().get()})
assertTrue(exception.getCause.getCause.getMessage.contains("Cannot request fenced brokers from controller endpoint"))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListNodesFromControllers(quorum: String): Unit = {
useBoostrapControllers()
client = createAdminClient
val result = client.describeCluster(new DescribeClusterOptions())
assertTrue(result.nodes().get().size().equals(controllerServers.size))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {

View File

@ -233,7 +233,7 @@ public class BrokerRegistration {
if (endpoint == null) {
return Optional.empty();
}
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null)));
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced));
}
public Map<String, VersionRange> supportedFeatures() {

View File

@ -159,7 +159,7 @@ public class BrokerRegistrationTest {
assertEquals(Optional.empty(), REGISTRATIONS.get(0).node("NONEXISTENT"));
assertEquals(Optional.of(new Node(0, "localhost", 9090, null)),
REGISTRATIONS.get(0).node("INTERNAL"));
assertEquals(Optional.of(new Node(1, "localhost", 9091, null)),
assertEquals(Optional.of(new Node(1, "localhost", 9091, null, true)),
REGISTRATIONS.get(1).node("INTERNAL"));
assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")),
REGISTRATIONS.get(2).node("INTERNAL"));

View File

@ -17,6 +17,8 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@ -31,11 +33,13 @@ import net.sourceforge.argparse4j.inf.Subparsers;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
public class ClusterTool {
@ -68,7 +72,9 @@ public class ClusterTool {
.help("Get information about the ID of a cluster.");
Subparser unregisterParser = subparsers.addParser("unregister")
.help("Unregister a broker.");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) {
Subparser listEndpoints = subparsers.addParser("list-endpoints")
.help("List endpoints");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser, listEndpoints)) {
MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
@ -85,6 +91,9 @@ public class ClusterTool {
.action(store())
.required(true)
.help("The ID of the broker to unregister.");
listEndpoints.addArgument("--include-fenced-brokers")
.action(storeTrue())
.help("Whether to include fenced brokers when listing broker endpoints");
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
@ -108,6 +117,17 @@ public class ClusterTool {
}
break;
}
case "list-endpoints": {
try (Admin adminClient = Admin.create(properties)) {
boolean includeFencedBrokers = Optional.of(namespace.getBoolean("include_fenced_brokers")).orElse(false);
boolean listControllerEndpoints = namespace.getString("bootstrap_controller") != null;
if (includeFencedBrokers && listControllerEndpoints) {
throw new IllegalArgumentException("The option --include-fenced-brokers is only supported with --bootstrap-server option");
}
listEndpoints(System.out, adminClient, listControllerEndpoints, includeFencedBrokers);
}
break;
}
default:
throw new RuntimeException("Unknown command " + command);
}
@ -135,4 +155,44 @@ public class ClusterTool {
}
}
}
static void listEndpoints(PrintStream stream, Admin adminClient, boolean listControllerEndpoints, boolean includeFencedBrokers) throws Exception {
try {
DescribeClusterOptions option = new DescribeClusterOptions().includeFencedBrokers(includeFencedBrokers);
Collection<Node> nodes = adminClient.describeCluster(option).nodes().get();
String maxHostLength = String.valueOf(nodes.stream().map(node -> node.host().length()).max(Integer::compareTo).orElse(100));
String maxRackLength = String.valueOf(nodes.stream().filter(node -> node.hasRack()).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10));
if (listControllerEndpoints) {
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n";
stream.printf(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE");
nodes.stream().forEach(node -> stream.printf(format,
node.idString(),
node.host(),
node.port(),
node.rack(),
"controller"
));
} else {
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-10s %-15s%n";
stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE");
nodes.stream().forEach(node -> stream.printf(format,
node.idString(),
node.host(),
node.port(),
node.rack(),
node.isFenced() ? "fenced" : "unfenced",
"broker"
));
}
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof UnsupportedVersionException) {
stream.println(ee.getCause().getMessage());
} else {
throw ee;
}
}
}
}

View File

@ -54,7 +54,7 @@ public class BrokerApiVersionsCommandTest {
BrokerApiVersionsCommand.mainNoExit("--bootstrap-server", clusterInstance.bootstrapServers()));
Iterator<String> lineIter = Arrays.stream(output.split("\n")).iterator();
assertTrue(lineIter.hasNext());
assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null) -> (", lineIter.next());
assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null isFenced: false) -> (", lineIter.next());
ApiMessageType.ListenerType listenerType = ApiMessageType.ListenerType.BROKER;

View File

@ -29,8 +29,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -61,6 +64,34 @@ public class ClusterToolTest {
assertTrue(output.contains("Broker " + brokerId + " is no longer registered."));
}
@ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT})
public void testListEndpointsWithBootstrapServer(ClusterInstance clusterInstance) {
String output = ToolsTestUtils.captureStandardOut(() ->
assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers())));
String port = clusterInstance.bootstrapServers().split(":")[1];
int id = clusterInstance.brokerIds().iterator().next();
String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker");
assertEquals(expected, output);
}
@ClusterTest(brokers = 2, types = {Type.KRAFT, Type.CO_KRAFT})
public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance clusterInstance) {
List<Integer> brokerIds = clusterInstance.brokerIds().stream().collect(Collectors.toList());
clusterInstance.shutdownBroker(brokerIds.get(0));
List<String> ports = Arrays.stream(clusterInstance.bootstrapServers().split(",")).map(b -> b.split(":")[1]).collect(Collectors.toList());
String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
String expected = String.format(format,
"ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE",
brokerIds.get(0), "localhost", ports.get(0), "null", "fenced", "broker",
brokerIds.get(1), "localhost", ports.get(1), "null", "unfenced", "broker");
String output = ToolsTestUtils.captureStandardOut(() -> assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers(), "--include-fenced-brokers")));
assertEquals(expected, output);
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testClusterIdWithBootstrapController(ClusterInstance clusterInstance) {
String output = ToolsTestUtils.captureStandardOut(() ->
@ -83,6 +114,25 @@ public class ClusterToolTest {
"the controller quorum.", exception.getCause().getMessage());
}
@ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
public void testListEndpointsWithBootstrapController(ClusterInstance clusterInstance) {
String output = ToolsTestUtils.captureStandardOut(() ->
assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-controller", clusterInstance.bootstrapControllers())));
String port = clusterInstance.bootstrapControllers().split(":")[1];
int id = clusterInstance.controllerIds().iterator().next();
String format = "%-10s %-9s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s";
String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE", id, "localhost", port, "null", "controller");
assertTrue(output.equals(expected));
}
@ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
public void testListEndpointsArgumentWithBootstrapController(ClusterInstance clusterInstance) {
RuntimeException exception =
assertThrows(RuntimeException.class,
() -> ClusterTool.execute("list-endpoints", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--include-fenced-brokers"));
assertEquals("The option --include-fenced-brokers is only supported with --bootstrap-server option", exception.getMessage());
}
@Test
public void testPrintClusterId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().