diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java index 9621e34ef5b..cb23e6aaf28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java @@ -26,4 +26,9 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) { super(type, deadlineMs); } + + @Override + public boolean requireSubscriptionMetadata() { + return true; + } } diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 0735829a0b1..03a987c54b4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -108,12 +108,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { createProducer(configOverrides = prop) else producer - verifyWithRetry(sendOneRecord(producer2)) + verifyWithRetry(sendOneRecord(producer2))() } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) - def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testTransactionalProducerWithAuthenticationFailure(quorum: String): Unit = { val txProducer = createTransactionalProducer() verifyAuthenticationException(txProducer.initTransactions()) @@ -122,7 +122,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() consumer.subscribe(List(topic).asJava) @@ -130,7 +130,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() consumer.assign(List(tp).asJava) @@ -138,7 +138,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) val consumer = createConsumer() @@ -153,13 +153,13 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { createClientCredential() val producer = createProducer() - verifyWithRetry(sendOneRecord(producer)) - verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count)) + verifyWithRetry(sendOneRecord(producer))() + verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) - def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testKafkaAdminClientWithAuthenticationFailure(quorum: String): Unit = { val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties)) props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) val adminClient = Admin.create(props) @@ -180,7 +180,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { verifyAuthenticationException(describeTopic()) createClientCredential() - verifyWithRetry(describeTopic()) + verifyWithRetry(describeTopic())() } finally { adminClient.close() } @@ -209,13 +209,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs") } - private def verifyWithRetry(action: => Unit): Unit = { + private def verifyWithRetry[T](operation: => T)(predicate: T => Boolean = (_: T) => true): Unit = { var attempts = 0 TestUtils.waitUntilTrue(() => { try { attempts += 1 - action - true + predicate(operation) } catch { case _: SaslAuthenticationException => false } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java index cd4198c7c79..4aad4af5751 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java @@ -136,24 +136,26 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name. @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") - @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") + @MethodSource("getTestQuorumAndGroupProtocolParametersAll") public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, String groupProtocol) throws Exception { - ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); - try (Consumer consumer = createConsumer()) { + try ( + ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); + Consumer consumer = createConsumer() + ) { consumer.subscribe(Collections.singletonList(TOPIC)); - verifyAuthenticationException(consumerGroupService::listGroups); - } finally { - consumerGroupService.close(); } } + // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name. @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") - @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") + @MethodSource("getTestQuorumAndGroupProtocolParametersAll") public void testConsumerGroupServiceWithAuthenticationSuccess(String quorum, String groupProtocol) throws Exception { createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2); - ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); - try (Consumer consumer = createConsumer()) { + try ( + ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); + Consumer consumer = createConsumer() + ) { consumer.subscribe(Collections.singletonList(TOPIC)); TestUtils.waitForCondition(() -> { @@ -165,8 +167,6 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } }, "failed to poll data with authentication"); assertEquals(1, consumerGroupService.listConsumerGroups().size()); - } finally { - consumerGroupService.close(); } }