MINOR; Refactor KafkaAdminClientTest to reduce the boilerplate code (#7842)

`KafkaAdminClientTest` contains many code repetitions which could be removed. This PR removes most of the boiler plate code.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2019-12-18 07:08:53 +01:00 committed by Jason Gustafson
parent fd7991ae23
commit 75a68341da
1 changed files with 51 additions and 279 deletions

View File

@ -235,11 +235,10 @@ public class KafkaAdminClientTest {
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
}
private static Cluster mockCluster(int controllerIndex) {
private static Cluster mockCluster(int numNodes, int controllerIndex) {
HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
nodes.put(1, new Node(1, "localhost", 8122));
nodes.put(2, new Node(2, "localhost", 8123));
for (int i = 0; i < numNodes; i++)
nodes.put(i, new Node(i, "localhost", 8121 + i));
return new Cluster("mockClusterId", nodes.values(),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(controllerIndex));
@ -251,7 +250,7 @@ public class KafkaAdminClientTest {
}
private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
return new AdminClientUnitTestEnv(mockCluster(0), configVals);
return new AdminClientUnitTestEnv(mockCluster(3, 0), configVals);
}
@Test
@ -357,7 +356,7 @@ public class KafkaAdminClientTest {
Cluster cluster = mockBootstrapCluster();
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
Cluster discoveredCluster = mockCluster(0);
Cluster discoveredCluster = mockCluster(3, 0);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
@ -383,7 +382,7 @@ public class KafkaAdminClientTest {
Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
Cluster discoveredCluster = mockCluster(0);
Cluster discoveredCluster = mockCluster(3, 0);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
@ -404,12 +403,12 @@ public class KafkaAdminClientTest {
*/
@Test
public void testPropagatedMetadataFetchException() throws Exception {
Cluster cluster = mockCluster(0);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM,
mockCluster(3, 0),
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
env.kafkaClient().createPendingAuthenticationError(env.cluster().nodeById(0),
TimeUnit.DAYS.toMillis(1));
env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
@ -434,11 +433,11 @@ public class KafkaAdminClientTest {
@Test
public void testCreateTopicsRetryBackoff() throws Exception {
Cluster cluster = mockCluster(0);
MockTime time = new MockTime();
int retryBackoff = 100;
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
mockCluster(3, 0),
newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
MockClient mockClient = env.kafkaClient();
@ -563,7 +562,7 @@ public class KafkaAdminClientTest {
String topic = "topic";
Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
Cluster initializedCluster = mockCluster(0);
Cluster initializedCluster = mockCluster(3, 0);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
@ -830,13 +829,9 @@ public class KafkaAdminClientTest {
@Ignore // The test is flaky. Should be renabled when this JIRA is fixed: https://issues.apache.org/jira/browse/KAFKA-5792
@Test
public void testHandleTimeout() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
nodes.put(0, new Node(0, "localhost", 8121));
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
mockCluster(1, 0),
AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@ -948,11 +943,10 @@ public class KafkaAdminClientTest {
TopicPartition tp0 = new TopicPartition(topic, 0);
TopicPartition tp1 = new TopicPartition(topic, 1);
Cluster cluster = mockCluster(0);
MockTime time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) {
List<Node> nodes = cluster.nodes();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0))) {
List<Node> nodes = env.cluster().nodes();
List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
@ -964,8 +958,8 @@ public class KafkaAdminClientTest {
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, partitionMetadata));
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(),
cluster.clusterResource().clusterId(), cluster.controller().id(), topicMetadata));
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
env.cluster().clusterResource().clusterId(), env.cluster().controller().id(), topicMetadata));
Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> deletedPartitions = new HashMap<>();
deletedPartitions.put(tp0, new DeleteRecordsResponse.PartitionResponse(3, Errors.NONE));
@ -986,7 +980,6 @@ public class KafkaAdminClientTest {
@Test
public void testDeleteRecords() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
List<PartitionInfo> partitionInfos = new ArrayList<>();
@ -1092,29 +1085,13 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeCluster() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes.values(),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Prepare the metadata response used for the first describe cluster
MetadataResponse response = MetadataResponse.prepareResponse(0,
new ArrayList<>(nodes.values()),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
Collections.emptyList(),
@ -1123,7 +1100,7 @@ public class KafkaAdminClientTest {
// Prepare the metadata response used for the second describe cluster
MetadataResponse response2 = MetadataResponse.prepareResponse(0,
new ArrayList<>(nodes.values()),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
3,
Collections.emptyList(),
@ -1147,24 +1124,8 @@ public class KafkaAdminClientTest {
@Test
public void testListConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes.values(),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried
@ -1194,7 +1155,7 @@ public class KafkaAdminClientTest {
.setGroupId("group-connect-1")
.setProtocolType("connector")
))),
node0);
env.cluster().nodeById(0));
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
@ -1203,14 +1164,14 @@ public class KafkaAdminClientTest {
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
node1);
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
node1);
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
@ -1223,7 +1184,7 @@ public class KafkaAdminClientTest {
.setGroupId("group-connect-2")
.setProtocolType("connector")
))),
node1);
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
@ -1237,7 +1198,7 @@ public class KafkaAdminClientTest {
.setGroupId("group-connect-3")
.setProtocolType("connector")
))),
node2);
env.cluster().nodeById(2));
// fatal error
env.kafkaClient().prepareResponseFrom(
@ -1245,7 +1206,7 @@ public class KafkaAdminClientTest {
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
node3);
env.cluster().nodeById(3));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
TestUtils.assertFutureError(result.all(), UnknownServerException.class);
@ -1265,20 +1226,7 @@ public class KafkaAdminClientTest {
@Test
public void testListConsumerGroupsMetadataFailure() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes.values(),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
@ -1301,18 +1249,7 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
@ -1407,18 +1344,7 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeMultipleConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@ -1476,18 +1402,7 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -1514,18 +1429,7 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeNonConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@ -1551,18 +1455,7 @@ public class KafkaAdminClientTest {
@Test
public void testDescribeConsumerGroupOffsets() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Retriable FindCoordinatorResponse errors should be retried
@ -1607,20 +1500,9 @@ public class KafkaAdminClientTest {
@Test
public void testDeleteConsumerGroups() throws Exception {
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final List<String> groupIds = singletonList("group-0");
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
@ -1702,23 +1584,12 @@ public class KafkaAdminClientTest {
public void testDeleteConsumerGroupOffsets() throws Exception {
// Happy path
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
final TopicPartition tp2 = new TopicPartition("bar", 0);
final TopicPartition tp3 = new TopicPartition("foobar", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -1759,21 +1630,10 @@ public class KafkaAdminClientTest {
public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception {
// Retriable errors should be retried
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -1811,23 +1671,12 @@ public class KafkaAdminClientTest {
public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception {
// Non-retriable errors throw an exception
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
final List<Errors> nonRetriableErrors = Arrays.asList(
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
for (Errors error : nonRetriableErrors) {
@ -1850,21 +1699,10 @@ public class KafkaAdminClientTest {
public void testDeleteConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception {
// Retriable FindCoordinatorResponse errors should be retried
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -1890,21 +1728,10 @@ public class KafkaAdminClientTest {
public void testDeleteConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception {
// Non-retriable FindCoordinatorResponse errors throw an exception
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -2278,23 +2105,12 @@ public class KafkaAdminClientTest {
public void testAlterConsumerGroupOffsets() throws Exception {
// Happy path
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
final TopicPartition tp2 = new TopicPartition("bar", 0);
final TopicPartition tp3 = new TopicPartition("foobar", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -2322,21 +2138,10 @@ public class KafkaAdminClientTest {
public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception {
// Retriable errors should be retried
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -2377,23 +2182,12 @@ public class KafkaAdminClientTest {
public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception {
// Non-retriable errors throw an exception
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
final List<Errors> nonRetriableErrors = Arrays.asList(
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
for (Errors error : nonRetriableErrors) {
@ -2417,21 +2211,10 @@ public class KafkaAdminClientTest {
public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception {
// Retriable FindCoordinatorResponse errors should be retried
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
@ -2459,21 +2242,10 @@ public class KafkaAdminClientTest {
public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception {
// Non-retriable FindCoordinatorResponse errors throw an exception
final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(