mirror of https://github.com/apache/kafka.git
KAFKA-18101: Merge duplicate assertFutureThrows and assertFutureExceptionTypeEquals (#17991)
Reviewers: Ziming Deng<dengziming1993@gmail.com>, Chia-Ping Tsai<chia7712@gmail.com>, TaiJuWu<tjwu1217@gmail.com>.
This commit is contained in:
parent
dbae448a05
commit
095bd0a6d4
|
@ -17,7 +17,6 @@ import java.util
|
|||
import java.util.{Collections, Properties}
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
|
||||
import kafka.utils.{Logging, TestUtils}
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsOptions, ConfigEntry}
|
||||
|
@ -28,6 +27,7 @@ import org.apache.kafka.network.SocketServerConfigs
|
|||
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.policy.AlterConfigPolicy
|
||||
import org.apache.kafka.storage.internals.log.LogConfig
|
||||
import org.apache.kafka.test.TestUtils.assertFutureThrows
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
|
@ -155,10 +155,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
|
|||
alterResult = client.incrementalAlterConfigs(alterConfigs)
|
||||
|
||||
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
|
||||
alterResult.values.get(topicResource2).get
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
|
||||
assertFutureThrows(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertTrue(validationsForResource(brokerResource).isEmpty,
|
||||
"Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
|
||||
validations.clear()
|
||||
|
@ -184,10 +184,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
|
|||
alterResult = client.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true))
|
||||
|
||||
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
|
||||
alterResult.values.get(topicResource2).get
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
|
||||
assertFutureThrows(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertTrue(validationsForResource(brokerResource).isEmpty,
|
||||
"Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.")
|
||||
validations.clear()
|
||||
|
|
|
@ -1601,7 +1601,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
def testDescribeGroupApiWithNoGroupAcl(quorum: String): Unit = {
|
||||
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
|
||||
val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.describedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -1687,14 +1687,14 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
consumer.assign(List(tp).asJava)
|
||||
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
|
||||
val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft"))
|
||||
def testDeleteGroupApiWithNoDeleteGroupAcl2(quorum: String): Unit = {
|
||||
val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
|
@ -1725,7 +1725,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
|
||||
consumer.close()
|
||||
val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.all(), classOf[GroupAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
|
@ -1745,15 +1745,15 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)), groupResource)
|
||||
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
|
||||
val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[TopicAuthorizationException])
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp), classOf[TopicAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.all(), classOf[TopicAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.partitionResult(tp), classOf[TopicAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft"))
|
||||
def testDeleteGroupOffsetsWithNoAcl(quorum: String): Unit = {
|
||||
val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava)
|
||||
TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException])
|
||||
JTestUtils.assertFutureThrows(result.all(), classOf[GroupAuthorizationException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType
|
|||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.security.authorizer.AclEntry
|
||||
import org.apache.kafka.test.TestUtils.assertFutureThrows
|
||||
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||
|
@ -108,14 +109,14 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
|
|||
val failedCreateResult = client.createTopics(newTopics.asJava)
|
||||
val results = failedCreateResult.values()
|
||||
assertTrue(results.containsKey("mytopic"))
|
||||
assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
|
||||
assertFutureThrows(results.get("mytopic"), classOf[TopicExistsException])
|
||||
assertTrue(results.containsKey("mytopic2"))
|
||||
assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
|
||||
assertFutureThrows(results.get("mytopic2"), classOf[TopicExistsException])
|
||||
assertTrue(results.containsKey("mytopic3"))
|
||||
assertFutureExceptionTypeEquals(results.get("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureExceptionTypeEquals(failedCreateResult.numPartitions("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureThrows(results.get("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureThrows(failedCreateResult.numPartitions("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureThrows(failedCreateResult.replicationFactor("mytopic3"), classOf[TopicExistsException])
|
||||
assertFutureThrows(failedCreateResult.config("mytopic3"), classOf[TopicExistsException])
|
||||
|
||||
val topicToDescription = client.describeTopics(topics.asJava).allTopicNames.get()
|
||||
assertEquals(topics.toSet, topicToDescription.keySet.asScala)
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.kafka.network.SocketServerConfigs
|
|||
import org.apache.kafka.security.authorizer.AclEntry
|
||||
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs}
|
||||
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
|
||||
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
|
||||
import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows}
|
||||
import org.apache.log4j.PropertyConfigurator
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||
|
@ -726,7 +726,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val nonExistingTopic = "non-existing"
|
||||
val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues()
|
||||
assertEquals(existingTopic, results.get(existingTopic).get.name)
|
||||
assertFutureExceptionTypeEquals(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException])
|
||||
assertFutureThrows(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -745,7 +745,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
val results = client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId, nonExistingTopicId).asJava)).topicIdValues()
|
||||
assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
|
||||
assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), classOf[UnknownTopicIdException])
|
||||
assertFutureThrows(results.get(nonExistingTopicId), classOf[UnknownTopicIdException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -1104,8 +1104,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
groupResource -> groupAlterConfigs
|
||||
).asJava, new AlterConfigsOptions().validateOnly(true))
|
||||
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(groupResource), classOf[InvalidConfigurationException],
|
||||
Some("consumer.session.timeout.ms must be greater than or equal to group.consumer.min.session.timeout.ms"))
|
||||
assertFutureThrows(alterResult.values.get(groupResource), classOf[InvalidConfigurationException],
|
||||
"consumer.session.timeout.ms must be greater than or equal to group.consumer.min.session.timeout.ms")
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -1707,10 +1707,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
|
||||
client = createAdminClient
|
||||
assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException])
|
||||
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl)).all(),
|
||||
assertFutureThrows(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException])
|
||||
assertFutureThrows(client.createAcls(Collections.singleton(acl)).all(),
|
||||
classOf[SecurityDisabledException])
|
||||
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
|
||||
assertFutureThrows(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
|
||||
classOf[SecurityDisabledException])
|
||||
}
|
||||
|
||||
|
@ -1727,7 +1727,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
|
||||
client.close(time.Duration.ofHours(2))
|
||||
val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
|
||||
assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
|
||||
assertFutureThrows(future2, classOf[IllegalStateException])
|
||||
future.get
|
||||
client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect
|
||||
}
|
||||
|
@ -1747,7 +1747,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
||||
new CreateTopicsOptions().timeoutMs(900000)).all()
|
||||
client.close(time.Duration.ZERO)
|
||||
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
|
||||
assertFutureThrows(future, classOf[TimeoutException])
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1764,7 +1764,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val startTimeMs = Time.SYSTEM.milliseconds()
|
||||
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
||||
new CreateTopicsOptions().timeoutMs(2)).all()
|
||||
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
|
||||
assertFutureThrows(future, classOf[TimeoutException])
|
||||
val endTimeMs = Time.SYSTEM.milliseconds()
|
||||
assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least one millisecond.")
|
||||
}
|
||||
|
@ -1782,7 +1782,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
|
||||
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
||||
new CreateTopicsOptions().validateOnly(true)).all()
|
||||
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
|
||||
assertFutureThrows(future, classOf[TimeoutException])
|
||||
val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
||||
new CreateTopicsOptions().validateOnly(true)).all()
|
||||
future2.get
|
||||
|
@ -1982,9 +1982,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
Collections.singleton(new MemberToRemove(invalidInstanceId))
|
||||
))
|
||||
|
||||
TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException])
|
||||
assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException])
|
||||
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
|
||||
TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException])
|
||||
assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException])
|
||||
|
||||
// Test consumer group deletion
|
||||
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
|
||||
|
@ -1992,12 +1992,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
// Deleting the fake group ID should get GroupIdNotFoundException.
|
||||
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
|
||||
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
|
||||
assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
|
||||
classOf[GroupIdNotFoundException])
|
||||
|
||||
// Deleting the real group ID should get GroupNotEmptyException
|
||||
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
|
||||
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
|
||||
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
|
||||
classOf[GroupNotEmptyException])
|
||||
|
||||
// Test delete one correct static member
|
||||
|
@ -2255,9 +2255,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
Collections.singleton(new MemberToRemove(invalidInstanceId))
|
||||
))
|
||||
|
||||
TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException])
|
||||
assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException])
|
||||
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
|
||||
TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException])
|
||||
assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException])
|
||||
|
||||
// Test consumer group deletion
|
||||
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
|
||||
|
@ -2265,12 +2265,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
// Deleting the fake group ID should get GroupIdNotFoundException.
|
||||
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
|
||||
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
|
||||
assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
|
||||
classOf[GroupIdNotFoundException])
|
||||
|
||||
// Deleting the real group ID should get GroupNotEmptyException
|
||||
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
|
||||
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
|
||||
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
|
||||
classOf[GroupNotEmptyException])
|
||||
|
||||
// Test delete one correct static member
|
||||
|
@ -2379,29 +2379,29 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
|
||||
|
||||
// Top level error will equal to the first partition level error
|
||||
assertFutureExceptionTypeEquals(offsetDeleteResult.all(), classOf[GroupSubscribedToTopicException])
|
||||
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp1),
|
||||
assertFutureThrows(offsetDeleteResult.all(), classOf[GroupSubscribedToTopicException])
|
||||
assertFutureThrows(offsetDeleteResult.partitionResult(tp1),
|
||||
classOf[GroupSubscribedToTopicException])
|
||||
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
|
||||
assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
|
||||
classOf[UnknownTopicOrPartitionException])
|
||||
|
||||
// Test the fake group ID
|
||||
val fakeDeleteResult = client.deleteConsumerGroupOffsets(fakeGroupId, Set(tp1, tp2).asJava)
|
||||
|
||||
assertFutureExceptionTypeEquals(fakeDeleteResult.all(), classOf[GroupIdNotFoundException])
|
||||
assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp1),
|
||||
assertFutureThrows(fakeDeleteResult.all(), classOf[GroupIdNotFoundException])
|
||||
assertFutureThrows(fakeDeleteResult.partitionResult(tp1),
|
||||
classOf[GroupIdNotFoundException])
|
||||
assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp2),
|
||||
assertFutureThrows(fakeDeleteResult.partitionResult(tp2),
|
||||
classOf[GroupIdNotFoundException])
|
||||
}
|
||||
|
||||
// Test offset deletion when group is empty
|
||||
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
|
||||
|
||||
assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
|
||||
assertFutureThrows(offsetDeleteResult.all(),
|
||||
classOf[UnknownTopicOrPartitionException])
|
||||
assertNull(offsetDeleteResult.partitionResult(tp1).get())
|
||||
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
|
||||
assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
|
||||
classOf[UnknownTopicOrPartitionException])
|
||||
} finally {
|
||||
Utils.closeQuietly(client, "adminClient")
|
||||
|
@ -3213,8 +3213,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
topic1Resource -> topic1AlterConfigs
|
||||
).asJava, new AlterConfigsOptions().validateOnly(true))
|
||||
|
||||
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
Some("Invalid value zip for configuration compression.type"))
|
||||
assertFutureThrows(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
"Invalid value zip for configuration compression.type: String must be one of: uncompressed, zstd, lz4, snappy, gzip, producer")
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -3362,8 +3362,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
|
||||
|
||||
// InvalidRequestException error for topic1
|
||||
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
|
||||
Some("Error due to duplicate config keys"))
|
||||
assertFutureThrows(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
|
||||
"Error due to duplicate config keys")
|
||||
|
||||
// Operation should succeed for topic2
|
||||
alterResult.values().get(topic2Resource).get()
|
||||
|
@ -3393,11 +3393,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
).asJava)
|
||||
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
|
||||
|
||||
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
Some("Can't APPEND to key compression.type because its type is not LIST."))
|
||||
assertFutureThrows(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
"Can't APPEND to key compression.type because its type is not LIST.")
|
||||
|
||||
assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException],
|
||||
Some("Can't SUBTRACT to key compression.type because its type is not LIST."))
|
||||
assertFutureThrows(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException],
|
||||
"Can't SUBTRACT to key compression.type because its type is not LIST.")
|
||||
|
||||
// Try to add invalid config
|
||||
topic1AlterConfigs = Seq(
|
||||
|
@ -3409,8 +3409,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
).asJava)
|
||||
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
|
||||
|
||||
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1"))
|
||||
assertFutureThrows(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
|
||||
"Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1")
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -3437,8 +3437,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
nonExistentTp1 -> validAssignment,
|
||||
nonExistentTp2 -> validAssignment
|
||||
).asJava).values()
|
||||
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
|
||||
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
|
||||
assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
|
||||
assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
|
||||
|
||||
val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava))
|
||||
val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava))
|
||||
|
@ -3448,9 +3448,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
tp2 -> negativeIdReplica,
|
||||
tp3 -> duplicateReplica
|
||||
).asJava).values()
|
||||
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
|
||||
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
|
||||
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
|
||||
assertFutureThrows(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
|
||||
assertFutureThrows(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
|
||||
assertFutureThrows(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -3465,8 +3465,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
assertTrue(results.containsKey(longTopicName))
|
||||
results.get(longTopicName).get()
|
||||
assertTrue(results.containsKey(invalidTopicName))
|
||||
assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException])
|
||||
assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
|
||||
assertFutureThrows(results.get(invalidTopicName), classOf[InvalidTopicException])
|
||||
assertFutureThrows(client.alterReplicaLogDirs(
|
||||
Map(new TopicPartitionReplica(longTopicName, 0, 0) -> brokers(0).config.logDirs(0)).asJava).all(),
|
||||
classOf[InvalidTopicException])
|
||||
client.close()
|
||||
|
@ -3493,10 +3493,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
TopicConfig.COMPRESSION_TYPE_CONFIG -> "producer"
|
||||
).asJava
|
||||
val newTopic = new NewTopic(topic, 2, brokerCount.toShort)
|
||||
assertFutureExceptionTypeEquals(
|
||||
assertFutureThrows(
|
||||
client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all,
|
||||
classOf[InvalidConfigurationException],
|
||||
Some("Null value not supported for topic configs: retention.bytes")
|
||||
"Null value not supported for topic configs: retention.bytes"
|
||||
)
|
||||
|
||||
val validConfigs = Map[String, String](TopicConfig.COMPRESSION_TYPE_CONFIG -> "producer").asJava
|
||||
|
@ -3509,10 +3509,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, null), AlterConfigOp.OpType.SET),
|
||||
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), AlterConfigOp.OpType.SET)
|
||||
)
|
||||
assertFutureExceptionTypeEquals(
|
||||
assertFutureThrows(
|
||||
client.incrementalAlterConfigs(Map(topicResource -> alterOps.asJavaCollection).asJava).all,
|
||||
classOf[InvalidRequestException],
|
||||
Some("Null value not supported for : retention.bytes")
|
||||
"Null value not supported for : retention.bytes"
|
||||
)
|
||||
validateLogConfig(compressionType = "producer")
|
||||
}
|
||||
|
@ -3932,9 +3932,9 @@ object PlaintextAdminIntegrationTest {
|
|||
var alterResult = admin.incrementalAlterConfigs(alterConfigs)
|
||||
|
||||
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
|
||||
alterResult.values.get(topicResource2).get
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertFutureThrows(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
|
||||
// Verify that first and third resources were not updated and second was updated
|
||||
test.ensureConsistentKRaftMetadata()
|
||||
|
@ -3961,9 +3961,9 @@ object PlaintextAdminIntegrationTest {
|
|||
alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true))
|
||||
|
||||
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
|
||||
assertFutureThrows(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException])
|
||||
alterResult.values.get(topicResource2).get
|
||||
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
assertFutureThrows(alterResult.values.get(brokerResource), classOf[InvalidRequestException])
|
||||
|
||||
// Verify that no resources are updated since validate_only = true
|
||||
test.ensureConsistentKRaftMetadata()
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConf
|
|||
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
|
||||
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
|
||||
import org.apache.kafka.storage.internals.log.LogConfig
|
||||
import org.apache.kafka.test.TestUtils.assertFutureThrows
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
|
@ -216,7 +217,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
|
||||
val results2 = client.createAcls(List(aclUnknown).asJava)
|
||||
assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
|
||||
assertFutureExceptionTypeEquals(results2.all, classOf[InvalidRequestException])
|
||||
assertFutureThrows(results2.all, classOf[InvalidRequestException])
|
||||
val results3 = client.deleteAcls(List(acl.toFilter, acl2.toFilter, acl3.toFilter).asJava).values
|
||||
assertEquals(Set(acl.toFilter, acl2.toFilter, acl3.toFilter), results3.keySet.asScala)
|
||||
assertEquals(0, results3.get(acl.toFilter).get.values.size())
|
||||
|
@ -403,8 +404,8 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
|
||||
val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
|
||||
assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
|
||||
assertFutureExceptionTypeEquals(results.values.get(clusterAcl), classOf[InvalidRequestException])
|
||||
assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
|
||||
assertFutureThrows(results.values.get(clusterAcl), classOf[InvalidRequestException])
|
||||
assertFutureThrows(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -601,9 +602,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, compressionConfig.value)
|
||||
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, compressionConfig.source)
|
||||
|
||||
assertFutureExceptionTypeEquals(result.numPartitions(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureExceptionTypeEquals(result.replicationFactor(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureExceptionTypeEquals(result.config(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureThrows(result.numPartitions(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureThrows(result.replicationFactor(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureThrows(result.config(topic2), classOf[TopicAuthorizationException])
|
||||
}
|
||||
validateMetadataAndConfigs(validateResult)
|
||||
|
||||
|
@ -614,7 +615,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
val topicIds = getTopicIds()
|
||||
assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get())
|
||||
assertEquals(topicIds(topic1), createResult.topicId(topic1).get())
|
||||
assertFutureExceptionTypeEquals(createResult.topicId(topic2), classOf[TopicAuthorizationException])
|
||||
assertFutureThrows(createResult.topicId(topic2), classOf[TopicAuthorizationException])
|
||||
|
||||
val createResponseConfig = createResult.config(topic1).get().entries.asScala
|
||||
|
||||
|
@ -637,7 +638,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxLifetimeMs(5000)
|
||||
|
||||
// Test expiration for non-exists token
|
||||
TestUtils.assertFutureExceptionTypeEquals(
|
||||
assertFutureThrows(
|
||||
client.expireDelegationToken("".getBytes()).expiryTimestamp(),
|
||||
classOf[DelegationTokenNotFoundException]
|
||||
)
|
||||
|
@ -650,7 +651,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
val token2 = client.createDelegationToken(createDelegationTokenOptions.maxLifetimeMs(1000)).delegationToken().get()
|
||||
// Ensure current time > maxLifeTimeMs of token
|
||||
Thread.sleep(1000)
|
||||
TestUtils.assertFutureExceptionTypeEquals(
|
||||
assertFutureThrows(
|
||||
client.expireDelegationToken(token2.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(1)).expiryTimestamp(),
|
||||
classOf[DelegationTokenExpiredException]
|
||||
)
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
|
|||
import org.apache.kafka.coordinator.group.GroupConfig
|
||||
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
|
||||
import org.apache.kafka.storage.internals.log.LogConfig
|
||||
import org.apache.kafka.test.TestUtils.assertFutureThrows
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{Test, Timeout}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
|
@ -320,7 +321,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
|||
val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
|
||||
val op = new AlterConfigOp(new ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "200000"), OpType.SET)
|
||||
val future = admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all
|
||||
TestUtils.assertFutureExceptionTypeEquals(future, classOf[InvalidRequestException])
|
||||
assertFutureThrows(future, classOf[InvalidRequestException])
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
|
@ -471,7 +472,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
|||
val resource = new ConfigResource(ConfigResource.Type.GROUP, "")
|
||||
val op = new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "200000"), OpType.SET)
|
||||
val future = admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all
|
||||
TestUtils.assertFutureExceptionTypeEquals(future, classOf[InvalidRequestException])
|
||||
assertFutureThrows(future, classOf[InvalidRequestException])
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
|
|
|
@ -1558,15 +1558,6 @@ object TestUtils extends Logging {
|
|||
)
|
||||
}
|
||||
|
||||
def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable],
|
||||
expectedErrorMessage: Option[String] = None): Unit = {
|
||||
val cause = assertThrows(classOf[ExecutionException], () => future.get()).getCause
|
||||
assertTrue(clazz.isInstance(cause), "Expected an exception of type " + clazz.getName + "; got type " +
|
||||
cause.getClass.getName)
|
||||
expectedErrorMessage.foreach(message => assertTrue(cause.getMessage.contains(message), s"Received error message : ${cause.getMessage}" +
|
||||
s" does not contain expected error message : $message"))
|
||||
}
|
||||
|
||||
def assertBadConfigContainingMessage(props: Properties, expectedExceptionContainsText: String): Unit = {
|
||||
try {
|
||||
KafkaConfig.fromProps(props)
|
||||
|
|
Loading…
Reference in New Issue