diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 78a7f905319..5191b62d4f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3106,37 +3106,57 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - for (Map.Entry responseEntry : logDirDescriptions(response).entrySet()) { + + Set pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet()); + Map directoryFailures = new HashMap<>(); + Map descriptions = logDirDescriptions(response); + if (descriptions.isEmpty()) { + Errors error = response.data().errorCode() == Errors.NONE.code() + ? Errors.CLUSTER_AUTHORIZATION_FAILED + : Errors.forCode(response.data().errorCode()); + handleFailure(error.exception(), pendingPartitions); + } + + for (Map.Entry responseEntry : descriptions.entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); // No replica info will be provided if the log directory is offline if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error() != null) - handleFailure(new IllegalStateException( - "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { - TopicPartition tp = replicaInfoEntry.getKey(); - ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); - ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); - if (replicaLogDirInfo == null) { - log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); - } else if (replicaInfo.isFuture()) { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); - } else { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + if (logDirInfo.error() == null) { + for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { + TopicPartition tp = replicaInfoEntry.getKey(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); + if (replicaLogDirInfo == null) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); + } else if (replicaInfo.isFuture()) { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); + } else { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); + } + pendingPartitions.remove(tp); } + } else { + directoryFailures.put(logDir, logDirInfo.error()); } } + if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) { + List errorAtDir = new ArrayList<>(); + directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k)); + Throwable error = new IllegalStateException("The error " + String.join(", ", errorAtDir) + " in the response from broker " + brokerId + " is illegal"); + handleFailure(error, pendingPartitions); + } + for (Map.Entry entry : replicaDirInfoByPartition.entrySet()) { TopicPartition tp = entry.getKey(); KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); @@ -3148,6 +3168,13 @@ public class KafkaAdminClient extends AdminClient { void handleFailure(Throwable throwable) { completeAllExceptionally(futures.values(), throwable); } + + void handleFailure(Throwable throwable, Collection topicPartitions) { + for (TopicPartition tp: topicPartitions) { + KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); + future.completeExceptionally(throwable); + } + } }, now); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 9084a25836e..bffb21986dd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -309,6 +309,7 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.admin.KafkaAdminClient.DEFAULT_LEAVE_GROUP_REASON; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -869,7 +870,7 @@ public class KafkaAdminClientTest { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(TimeoutException.class, future); + assertFutureThrows(TimeoutException.class, future); } } @@ -952,7 +953,7 @@ public class KafkaAdminClientTest { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(SaslAuthenticationException.class, future); + assertFutureThrows(SaslAuthenticationException.class, future); } } @@ -982,7 +983,7 @@ public class KafkaAdminClientTest { new NewTopic("myTopic2", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)); topicsResult.values().get("myTopic").get(); - TestUtils.assertFutureThrows(ApiException.class, topicsResult.values().get("myTopic2")); + assertFutureThrows(ApiException.class, topicsResult.values().get("myTopic2")); } } @@ -1086,7 +1087,7 @@ public class KafkaAdminClientTest { assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1131,9 +1132,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1157,9 +1158,9 @@ public class KafkaAdminClientTest { new CreateTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1194,14 +1195,14 @@ public class KafkaAdminClientTest { prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); + assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopics("myTopic"), prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, future); + assertFutureThrows(UnknownTopicOrPartitionException.class, future); // With topic IDs Uuid topicId = Uuid.randomUuid(); @@ -1218,14 +1219,14 @@ public class KafkaAdminClientTest { prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); + assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopicIds(topicId), prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, future); + assertFutureThrows(UnknownTopicIdException.class, future); } } @@ -1244,7 +1245,7 @@ public class KafkaAdminClientTest { asList("myTopic", "myOtherTopic"), new DeleteTopicsOptions()); result.topicNameValues().get("myTopic").get(); - TestUtils.assertFutureThrows(ApiException.class, result.topicNameValues().get("myOtherTopic")); + assertFutureThrows(ApiException.class, result.topicNameValues().get("myOtherTopic")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1258,7 +1259,7 @@ public class KafkaAdminClientTest { TopicCollection.ofTopicIds(asList(topicId1, topicId2)), new DeleteTopicsOptions()); resultIds.topicIdValues().get(topicId1).get(); - TestUtils.assertFutureThrows(ApiException.class, resultIds.topicIdValues().get(topicId2)); + assertFutureThrows(ApiException.class, resultIds.topicIdValues().get(topicId2)); } } @@ -1290,7 +1291,7 @@ public class KafkaAdminClientTest { assertNull(result.topicNameValues().get("topic1").get()); assertNull(result.topicNameValues().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1320,7 +1321,7 @@ public class KafkaAdminClientTest { assertNull(resultIds.topicIdValues().get(topicId1).get()); assertNull(resultIds.topicIdValues().get(topicId2).get()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1362,9 +1363,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1398,9 +1399,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); + e = assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1421,9 +1422,9 @@ public class KafkaAdminClientTest { new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1441,9 +1442,9 @@ public class KafkaAdminClientTest { new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); + e = assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1475,14 +1476,14 @@ public class KafkaAdminClientTest { List sillyTopicNames = asList("", null); Map> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, deleteFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, deleteFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map> describeFutures = env.adminClient().describeTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, describeFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, describeFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); @@ -1493,7 +1494,7 @@ public class KafkaAdminClientTest { Map> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, createFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, createFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } @@ -1789,7 +1790,7 @@ public class KafkaAdminClientTest { asList(topicName1, topicName0), new DescribeTopicsOptions() ); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.allTopicNames()); + assertFutureThrows(TopicAuthorizationException.class, result.allTopicNames()); } } @@ -1888,10 +1889,10 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData() .setErrorCode(Errors.SECURITY_DISABLED.code()) .setErrorMessage("Security is disabled"), ApiKeys.DESCRIBE_ACLS.latestVersion())); - TestUtils.assertFutureThrows(SecurityDisabledException.class, env.adminClient().describeAcls(FILTER2).values()); + assertFutureThrows(SecurityDisabledException.class, env.adminClient().describeAcls(FILTER2).values()); // Test a call where we supply an invalid filter. - TestUtils.assertFutureThrows(InvalidRequestException.class, env.adminClient().describeAcls(UNKNOWN_FILTER).values()); + assertFutureThrows(InvalidRequestException.class, env.adminClient().describeAcls(UNKNOWN_FILTER).values()); } } @@ -1983,9 +1984,9 @@ public class KafkaAdminClientTest { new CreateAclsResponseData.AclCreationResult())))); results = env.adminClient().createAcls(asList(ACL1, ACL2)); assertCollectionIs(results.values().keySet(), ACL1, ACL2); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.values().get(ACL1)); + assertFutureThrows(SecurityDisabledException.class, results.values().get(ACL1)); results.values().get(ACL2).get(); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, results.all()); } } @@ -2013,8 +2014,8 @@ public class KafkaAdminClientTest { assertEquals(ACL1, filter1Results.values().get(0).binding()); assertNull(filter1Results.values().get(1).exception()); assertEquals(ACL2, filter1Results.values().get(1).binding()); - TestUtils.assertFutureThrows(SecurityDisabledException.class, filterResults.get(FILTER2)); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, filterResults.get(FILTER2)); + assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where one deletion result has an error. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2034,7 +2035,7 @@ public class KafkaAdminClientTest { ApiKeys.DELETE_ACLS.latestVersion())); results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); assertTrue(results.values().get(FILTER2).get().values().isEmpty()); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where there are no errors. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2105,7 +2106,7 @@ public class KafkaAdminClientTest { electionType, Set.of(topic1, topic2), new ElectLeadersOptions().timeoutMs(100)); - TestUtils.assertFutureThrows(TimeoutException.class, results.partitions()); + assertFutureThrows(TimeoutException.class, results.partitions()); } } } @@ -2170,7 +2171,7 @@ public class KafkaAdminClientTest { topic2)).values(); assertEquals(Set.of(topic, topic2), result.keySet()); result.get(topic); - TestUtils.assertFutureThrows(ApiException.class, result.get(topic2)); + assertFutureThrows(ApiException.class, result.get(topic2)); } } @@ -2517,6 +2518,53 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())); + env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(List.of(tpr)); + Map> values = result.values(); + + assertFutureThrows(ClusterAuthorizationException.class, values.get(tpr)); + } + + } + + @Test + public void testDescribeReplicaLogDirsWithSingleDirException() throws ExecutionException, InterruptedException { + int brokerId = 1; + TopicPartitionReplica successfulTpr = new TopicPartitionReplica("topic", 12, brokerId); + TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 12, brokerId); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + + DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult = prepareDescribeLogDirsResult( + successfulTpr, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false); + DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) + .setLogDir(broker1log1); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(List.of(successfulResult, failedResult))); + env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(successfulTpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(List.of(successfulTpr, failedTpr)); + Map> values = result.values(); + + assertNotNull(values.get(successfulTpr).get()); + Throwable e = assertFutureThrows(IllegalStateException.class, values.get(failedTpr)); + assertTrue(e.getMessage().equals("The error org.apache.kafka.common.errors.LogDirNotFoundException at " + broker1log1 + " in the response from broker " + brokerId + " is illegal")); + } + } + @Test public void testCreatePartitions() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -2578,7 +2626,7 @@ public class KafkaAdminClientTest { assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2624,9 +2672,9 @@ public class KafkaAdminClientTest { time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2651,9 +2699,9 @@ public class KafkaAdminClientTest { counts, new CreatePartitionsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2688,7 +2736,7 @@ public class KafkaAdminClientTest { recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L)); DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, results.lowWatermarks().get(partition)); + assertFutureThrows(TopicAuthorizationException.class, results.lowWatermarks().get(partition)); } } @@ -2734,7 +2782,7 @@ public class KafkaAdminClientTest { DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); assertEquals(3L, results.lowWatermarks().get(tp0).get().lowWatermark()); - TestUtils.assertFutureThrows(SaslAuthenticationException.class, results.lowWatermarks().get(tp1)); + assertFutureThrows(SaslAuthenticationException.class, results.lowWatermarks().get(tp1)); } } @@ -2856,13 +2904,13 @@ public class KafkaAdminClientTest { DescribeTopicsResult result1 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(nonExistID))); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, result1.allTopicIds()); + assertFutureThrows(UnknownTopicIdException.class, result1.allTopicIds()); Exception e = assertThrows(Exception.class, () -> result1.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals(String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID), e.getMessage()); DescribeTopicsResult result2 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); - TestUtils.assertFutureThrows(InvalidTopicException.class, result2.allTopicIds()); + assertFutureThrows(InvalidTopicException.class, result2.allTopicIds()); e = assertThrows(Exception.class, () -> result2.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals("The given topic id 'AAAAAAAAAAAAAAAAAAAAAA' cannot be represented in a request.", e.getCause().getMessage()); @@ -2924,10 +2972,10 @@ public class KafkaAdminClientTest { .setErrorMessage(errorMessage))); final DescribeClusterResult result = env.adminClient().describeCluster(); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.clusterId(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.controller(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.nodes(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.authorizedOperations(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.clusterId(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.controller(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.nodes(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.authorizedOperations(), errorMessage); } } @@ -3088,7 +3136,7 @@ public class KafkaAdminClientTest { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(6, listings.size()); @@ -3122,7 +3170,7 @@ public class KafkaAdminClientTest { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3279,7 +3327,7 @@ public class KafkaAdminClientTest { ListGroupsOptions options = new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)); ListGroupsResult result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); // But a type filter which is just classic groups is permitted with an older broker, because they // only know about classic groups so the types filter can be omitted. @@ -3314,7 +3362,7 @@ public class KafkaAdminClientTest { options = new ListGroupsOptions().withTypes(Set.of(GroupType.CONSUMER)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3331,7 +3379,7 @@ public class KafkaAdminClientTest { request -> request instanceof DescribeClusterRequest); final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.nodes()); + assertFutureThrows(UnsupportedVersionException.class, result.nodes()); } } @@ -3428,7 +3476,7 @@ public class KafkaAdminClientTest { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(3, listings.size()); @@ -3463,7 +3511,7 @@ public class KafkaAdminClientTest { List.of())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3636,7 +3684,7 @@ public class KafkaAdminClientTest { options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3682,7 +3730,7 @@ public class KafkaAdminClientTest { options = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.SHARE)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3780,7 +3828,7 @@ public class KafkaAdminClientTest { env.cluster().nodeById(3)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(3, listings.size()); @@ -3816,7 +3864,7 @@ public class KafkaAdminClientTest { List.of())); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3953,7 +4001,7 @@ public class KafkaAdminClientTest { options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3999,7 +4047,7 @@ public class KafkaAdminClientTest { options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.SHARE)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); // But a type filter which is just classic groups is permitted with an older broker, because they // only know about classic groups so the types filter can be omitted. @@ -4058,7 +4106,7 @@ public class KafkaAdminClientTest { offsets.put(tp1, new OffsetAndMetadata(123L)); final AlterConsumerGroupOffsetsResult result = env.adminClient().alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4081,7 +4129,7 @@ public class KafkaAdminClientTest { offsets.put(tp1, new OffsetAndMetadata(123L)); final AlterStreamsGroupOffsetsResult result = env.adminClient().alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4112,9 +4160,9 @@ public class KafkaAdminClientTest { .alterConsumerGroupOffsets(GROUP_ID, offsets); assertNull(result.partitionResult(foo0).get()); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); } } @@ -4145,9 +4193,9 @@ public class KafkaAdminClientTest { .alterStreamsGroupOffsets(GROUP_ID, offsets); assertNull(result.partitionResult(foo0).get()); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); } } @@ -4275,7 +4323,7 @@ public class KafkaAdminClientTest { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4639,7 +4687,7 @@ public class KafkaAdminClientTest { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.describedGroups().get(GROUP_ID)); + assertFutureThrows(IllegalArgumentException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4660,7 +4708,7 @@ public class KafkaAdminClientTest { request -> request instanceof DescribeGroupsRequest); DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.describedGroups().get(GROUP_ID)); + assertFutureThrows(UnsupportedVersionException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4864,7 +4912,7 @@ public class KafkaAdminClientTest { final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(TimeoutException.class, result.partitionsToOffsetAndMetadata()); + assertFutureThrows(TimeoutException.class, result.partitionsToOffsetAndMetadata()); } } @@ -4971,7 +5019,7 @@ public class KafkaAdminClientTest { ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionsToOffsetAndMetadata()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionsToOffsetAndMetadata()); } } } @@ -5345,7 +5393,7 @@ public class KafkaAdminClientTest { final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5372,7 +5420,7 @@ public class KafkaAdminClientTest { final DeleteStreamsGroupsResult result = env.adminClient().deleteStreamsGroups(groupIds); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5530,7 +5578,7 @@ public class KafkaAdminClientTest { prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); + assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); // Retriable errors should be retried env.kafkaClient().prepareResponse( @@ -5626,7 +5674,7 @@ public class KafkaAdminClientTest { prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); DeleteStreamsGroupsResult errorResult = env.adminClient().deleteStreamsGroups(groupIds); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); + assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); // Retriable errors should be retried env.kafkaClient().prepareResponse( @@ -5796,7 +5844,7 @@ public class KafkaAdminClientTest { final DeleteConsumerGroupOffsetsResult result = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5818,7 +5866,7 @@ public class KafkaAdminClientTest { final DeleteStreamsGroupOffsetsResult result = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5957,8 +6005,8 @@ public class KafkaAdminClientTest { GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); assertNull(errorResult.partitionResult(tp1).get()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3)); } } @@ -6002,8 +6050,8 @@ public class KafkaAdminClientTest { GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); assertNull(errorResult.partitionResult(tp1).get()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3)); } } @@ -6120,8 +6168,8 @@ public class KafkaAdminClientTest { DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -6147,8 +6195,8 @@ public class KafkaAdminClientTest { DeleteStreamsGroupOffsetsResult errorResult = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -6224,8 +6272,8 @@ public class KafkaAdminClientTest { final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -6244,8 +6292,8 @@ public class KafkaAdminClientTest { final DeleteStreamsGroupOffsetsResult errorResult = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -6561,7 +6609,7 @@ public class KafkaAdminClientTest { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(4, listings.size()); @@ -6596,7 +6644,7 @@ public class KafkaAdminClientTest { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -6655,7 +6703,7 @@ public class KafkaAdminClientTest { .setGroupId("streams-group-1")))), env.cluster().nodeById(0)); ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -6971,7 +7019,7 @@ public class KafkaAdminClientTest { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(4, listings.size()); @@ -7006,7 +7054,7 @@ public class KafkaAdminClientTest { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -7065,7 +7113,7 @@ public class KafkaAdminClientTest { .setGroupId("share-group-1")))), env.cluster().nodeById(0)); ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -7301,10 +7349,10 @@ public class KafkaAdminClientTest { configs.put(groupResource, singletonList(alterConfigOp4)); AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs); - TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(brokerResource)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(topicResource)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(metricResource)); - TestUtils.assertFutureThrows(InvalidConfigurationException.class, result.values().get(groupResource)); + assertFutureThrows(ClusterAuthorizationException.class, result.values().get(brokerResource)); + assertFutureThrows(InvalidRequestException.class, result.values().get(topicResource)); + assertFutureThrows(InvalidRequestException.class, result.values().get(metricResource)); + assertFutureThrows(InvalidConfigurationException.class, result.values().get(groupResource)); // Test a call where there are no errors. responseData = new IncrementalAlterConfigsResponseData(); @@ -7398,7 +7446,7 @@ public class KafkaAdminClientTest { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -7534,8 +7582,8 @@ public class KafkaAdminClientTest { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(error.exception().getClass(), result.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), result.memberResult(memberToRemove)); + assertFutureThrows(error.exception().getClass(), result.all()); + assertFutureThrows(error.exception().getClass(), result.memberResult(memberToRemove)); } } } @@ -7570,8 +7618,8 @@ public class KafkaAdminClientTest { MemberToRemove memberOne = new MemberToRemove(instanceOne); MemberToRemove memberTwo = new MemberToRemove(instanceTwo); - TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberOne)); - TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberTwo)); + assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberOne)); + assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberTwo)); MemberResponse responseOne = new MemberResponse() .setGroupInstanceId(instanceOne) @@ -7592,8 +7640,8 @@ public class KafkaAdminClientTest { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.all()); - TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.memberResult(memberOne)); + assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.all()); + assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.memberResult(memberOne)); assertNull(memberLevelErrorResult.memberResult(memberTwo).get()); // Return with missing member. @@ -7607,9 +7655,9 @@ public class KafkaAdminClientTest { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); + assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); // The memberOne was not included in the response. - TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(memberOne)); + assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(memberOne)); assertNull(missingMemberResult.memberResult(memberTwo).get()); @@ -7761,8 +7809,8 @@ public class KafkaAdminClientTest { AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments); Future future1 = result1.all(); Future future2 = result1.values().get(tp1); - TestUtils.assertFutureThrows(UnknownServerException.class, future1); - TestUtils.assertFutureThrows(UnknownServerException.class, future2); + assertFutureThrows(UnknownServerException.class, future1); + assertFutureThrows(UnknownServerException.class, future2); // 2. NOT_CONTROLLER error handling AlterPartitionReassignmentsResponseData controllerErrResponseData = @@ -7813,7 +7861,7 @@ public class KafkaAdminClientTest { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData)); AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - TestUtils.assertFutureThrows(InvalidReplicaAssignmentException.class, partitionLevelErrResult.values().get(tp1)); + assertFutureThrows(InvalidReplicaAssignmentException.class, partitionLevelErrResult.values().get(tp1)); partitionLevelErrResult.values().get(tp2).get(); // 4. top-level error @@ -7832,9 +7880,9 @@ public class KafkaAdminClientTest { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData)); AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.all()).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp1)).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp2)).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.all()).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp1)).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp2)).getMessage()); // 5. unrepresentable topic name error TopicPartition invalidTopicTP = new TopicPartition("", 0); @@ -7853,8 +7901,8 @@ public class KafkaAdminClientTest { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData)); AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments); - TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidTopicTP)); - TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidPartitionTP)); + assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidTopicTP)); + assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidPartitionTP)); unrepresentableTopicResult.values().get(tp1).get(); // Test success scenario @@ -7923,7 +7971,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData)); ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(Set.of(tp1, tp2)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, unknownTpResult.reassignments()); + assertFutureThrows(UnknownTopicOrPartitionException.class, unknownTpResult.reassignments()); // 3. Success ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() @@ -7974,7 +8022,7 @@ public class KafkaAdminClientTest { assertNull(result.all().get()); assertNull(result.partitionResult(tp1).get()); assertNull(result.partitionResult(tp2).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); } } @@ -8006,7 +8054,7 @@ public class KafkaAdminClientTest { assertNull(result.all().get()); assertNull(result.partitionResult(tp1).get()); assertNull(result.partitionResult(tp2).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); } } @@ -8118,8 +8166,8 @@ public class KafkaAdminClientTest { AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -8146,8 +8194,8 @@ public class KafkaAdminClientTest { AlterStreamsGroupOffsetsResult errorResult = env.adminClient() .alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -8229,8 +8277,8 @@ public class KafkaAdminClientTest { final AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -8251,8 +8299,8 @@ public class KafkaAdminClientTest { final AlterStreamsGroupOffsetsResult errorResult = env.adminClient() .alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -8426,7 +8474,7 @@ public class KafkaAdminClientTest { partitions.put(tp0, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); + assertFutureThrows(TopicAuthorizationException.class, result.all()); } } @@ -8454,7 +8502,7 @@ public class KafkaAdminClientTest { ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -8502,7 +8550,7 @@ public class KafkaAdminClientTest { put(tp1, OffsetSpec.latest()); }}); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); + assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); assertEquals(345L, tp1Offset.offset()); @@ -8564,7 +8612,7 @@ public class KafkaAdminClientTest { put(tp1, OffsetSpec.latest()); } }); - TestUtils.assertFutureThrows(TimeoutException.class, result.partitionResult(tp0)); + assertFutureThrows(TimeoutException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Result = result.partitionResult(tp1).get(); assertEquals(345L, tp1Result.offset()); assertEquals(543, tp1Result.leaderEpoch().get().intValue()); @@ -8627,7 +8675,7 @@ public class KafkaAdminClientTest { ListOffsetsResult result = env.adminClient().listOffsets( Collections.singletonMap(tp0, OffsetSpec.latest())); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); + assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); } } @@ -8954,56 +9002,56 @@ public class KafkaAdminClientTest { body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false, false)); KafkaFuture future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); // Test incorrect topic count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect topic name env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, true, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, true, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition index env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test partition level error env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.INVALID_REQUEST, false, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); // Test all incorrect and no errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test all incorrect and both errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.INVALID_REQUEST, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); } } @@ -9218,7 +9266,7 @@ public class KafkaAdminClientTest { partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(expectedFailure, result.all()); + assertFutureThrows(expectedFailure, result.all()); } } @@ -9286,8 +9334,8 @@ public class KafkaAdminClientTest { partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); assertNotNull(result.partitionResult(tp0).get()); - TestUtils.assertFutureThrows(ApiException.class, result.partitionResult(tp1)); - TestUtils.assertFutureThrows(ApiException.class, result.all()); + assertFutureThrows(ApiException.class, result.partitionResult(tp1)); + assertFutureThrows(ApiException.class, result.all()); } } @@ -9406,7 +9454,7 @@ public class KafkaAdminClientTest { }, "Timed out waiting for Metadata request to be sent"); time.sleep(requestTimeoutMs + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.future); + assertFutureThrows(TimeoutException.class, result.future); } } @@ -9446,7 +9494,7 @@ public class KafkaAdminClientTest { // Now sleep the remaining time for the request timeout to expire time.sleep(60000); - TestUtils.assertFutureThrows(TimeoutException.class, result.future); + assertFutureThrows(TimeoutException.class, result.future); } } @@ -9536,8 +9584,8 @@ public class KafkaAdminClientTest { AlterClientQuotasResult result = env.adminClient().alterClientQuotas(entries); result.values().get(goodEntity); - TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(unauthorizedEntity)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(invalidEntity)); + assertFutureThrows(ClusterAuthorizationException.class, result.values().get(unauthorizedEntity)); + assertFutureThrows(InvalidRequestException.class, result.values().get(invalidEntity)); // ensure immutable assertThrows(UnsupportedOperationException.class, () -> result.values().put(newClientQuotaEntity(ClientQuotaEntity.USER, "user-3"), null)); @@ -9576,7 +9624,7 @@ public class KafkaAdminClientTest { logDirs.put(tpr1, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr0).get()); - TestUtils.assertFutureThrows(LogDirNotFoundException.class, result.values().get(tpr1)); + assertFutureThrows(LogDirNotFoundException.class, result.values().get(tpr1)); } } @@ -9607,7 +9655,7 @@ public class KafkaAdminClientTest { logDirs.put(tpr2, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr1).get()); - TestUtils.assertFutureThrows(ApiException.class, result.values().get(tpr2)); + assertFutureThrows(ApiException.class, result.values().get(tpr2)); } } @@ -9643,7 +9691,7 @@ public class KafkaAdminClientTest { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.values().get(tpr1)); + assertFutureThrows(TimeoutException.class, result.values().get(tpr1)); assertNull(result.values().get(tpr2).get()); } } @@ -9831,7 +9879,7 @@ public class KafkaAdminClientTest { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.descriptions().get(0)); + assertFutureThrows(TimeoutException.class, result.descriptions().get(0)); assertNotNull(result.descriptions().get(1).get()); } } @@ -9890,7 +9938,7 @@ public class KafkaAdminClientTest { UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId); // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); } } @@ -9924,7 +9972,7 @@ public class KafkaAdminClientTest { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); } } @@ -9941,7 +9989,7 @@ public class KafkaAdminClientTest { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -9956,7 +10004,7 @@ public class KafkaAdminClientTest { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -10016,7 +10064,7 @@ public class KafkaAdminClientTest { "Future failed to timeout after expiration of timeout"); assertTrue(result.all().isCompletedExceptionally()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); assertFalse(env.kafkaClient().hasInFlightRequests()); } } @@ -10754,7 +10802,7 @@ public class KafkaAdminClientTest { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -10818,7 +10866,7 @@ public class KafkaAdminClientTest { Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions()); assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -10964,7 +11012,7 @@ public class KafkaAdminClientTest { options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(DuplicateVoterException.class, result.all()); + assertFutureThrows(DuplicateVoterException.class, result.all()); } else { result.all().get(); } @@ -11049,7 +11097,7 @@ public class KafkaAdminClientTest { options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(VoterNotFoundException.class, result.all()); + assertFutureThrows(VoterNotFoundException.class, result.all()); } else { result.all().get(); } @@ -11373,7 +11421,7 @@ public class KafkaAdminClientTest { assertNull(result.partitionResult(fooTopicPartition0).get()); assertNull(result.partitionResult(fooTopicPartition1).get()); assertNull(result.partitionResult(barPartition0).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); } } @@ -11393,9 +11441,9 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1)); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); + assertFutureThrows(GroupAuthorizationException.class, result.all()); + assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); } } @@ -11420,9 +11468,9 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); + assertFutureThrows(TopicAuthorizationException.class, result.all()); assertNull(result.partitionResult(fooTopicPartition0).get()); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1)); + assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1)); assertNull(result.partitionResult(barPartition0).get()); } } @@ -11519,7 +11567,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data)); final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName)); - TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all()); + assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all()); } } @@ -11550,8 +11598,8 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data)); final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName)); - TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all()); - TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.topicResult(fooName)); + assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all()); + assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.topicResult(fooName)); assertNull(result.topicResult(barName).get()); } }